Kafka Java客户端代码的示例分析
发表于:2025-11-16 作者:千家信息网编辑
千家信息网最后更新 2025年11月16日,这篇文章将为大家详细讲解有关Kafka Java客户端代码的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。kafka是一种高吞吐量的分布式发布
千家信息网最后更新 2025年11月16日Kafka Java客户端代码的示例分析
这篇文章将为大家详细讲解有关Kafka Java客户端代码的示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
kafka是一种高吞吐量的分布式发布订阅消息系统
kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)
当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。
高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理
测试环境
kafka_2.10-0.8.1.1 3个节点做的集群
zookeeper-3.4.5 一个实例节点
代码示例
消息生产者代码示例
import java.util.Collections; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example * @author Fung * */ public class ProducerDemo { public static void main(String[] args) { Random rnd = new Random(); int events=100; // 设置配置属性 Properties props = new Properties(); props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // key.serializer.class默认为serializer.class props.put("key.serializer.class", "kafka.serializer.StringEncoder"); // 可选配置,如果不配置,则使用默认的partitioner props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo"); // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失 // 值为0,1,-1,可以参考 // http://kafka.apache.org/08/configuration.html props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); // 创建producer Producer producer = new Producer(config); // 产生并发送消息 long start=System.currentTimeMillis(); for (long i = 0; i < events; i++) { long runtime = new Date().getTime(); String ip = "192.168.2." + i;//rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0 KeyedMessage data = new KeyedMessage( "page_visits", ip, msg); producer.send(data); } System.out.println("耗时:" + (System.currentTimeMillis() - start)); // 关闭producer producer.close(); } } 消息消费者代码示例
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example * * @author Fung * */ public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) { consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int numThreads) { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(numThreads)); Map>> consumerMap = consumer .createMessageStreams(topicCountMap); List> streams = consumerMap.get(topic); // now launch all the threads executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerMsgTask(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] arg) { String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" }; String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic); demo.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } demo.shutdown(); } } 消息处理类
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerMsgTask implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerMsgTask(KafkaStream stream, int threadNumber) { m_threadNumber = threadNumber; m_stream = stream; } public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); } } Partitioner类示例
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class PartitionerDemo implements Partitioner { public PartitionerDemo(VerifiableProperties props) { } @Override public int partition(Object obj, int numPartitions) { int partition = 0; if (obj instanceof String) { String key=(String)obj; int offset = key.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions; } }else{ partition = obj.toString().length() % numPartitions; } return partition; } }关于Kafka Java客户端代码的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
消息
示例
日志
代码
系统
参考
分布式
数据
处理
配置
客户
客户端
分析
内容
可靠性
同时
文章
更多
知识
篇文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发团队建设实际方法
php是 用的什么软件开发
虚拟服务器好处
甲骨文数据库和中国数据库
软件开发一年最多能赚多少钱
设备编程用什么软件开发
云服务器怎么设置更安全
数据库天数加1
个人租服务器可以用来挖矿吗
开箱是啥意思软件开发
数据库删除表数据空间
网络安全文明同行的内容
php成都软件开发工资
服务器回复offer报文很慢
rd440服务器故障灯
软件开发与测试最好的大学
网络安全研究的目的及意义
数据库应用与开发作用是什么
联想服务器驱动管理
数据库运维要学哪些
erp软件开发哪家好用
如何连接打印机服务器
网络安全概念股早盘拉升
融达互联数据库配置
app服务器怎么管理
声卡下载软件开发
软件开发运营部招人
mc服务器地皮
数据库如何给表增加新的列
数据库活动图