如何解析Kafka 1.0.0 多消费者示例
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package kafka.demo;impo
千家信息网最后更新 2025年12月02日如何解析Kafka 1.0.0 多消费者示例
如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package kafka.demo;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;/** * *Description: kafka 1.0.0
* @author guangshihao * @date 2018年9月19日 * */public class KafkaProduderDemo { public static void main(String[] args) { Mapprops = new HashMap<>(); /* * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 */ props.put("acks", "1"); //配置默认的分区方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * kafka broker对应的主机,格式为host1:port1,host2:port2 */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord record = new ProducerRecord (topic,line ); producer.send(record); } producer.close(); }}
package kafka.demo;import java.util.List;import java.util.concurrent.atomic.AtomicBoolean;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;public class MutilConsumerThread implements Runnable{ private AtomicBoolean closed = new AtomicBoolean(false); KafkaConsumer consumer = null; String topic = null; public MutilConsumerThread(KafkaConsumer consumer,List topic) { this.consumer=consumer; consumer.subscribe(topic); } public void run() { try{ while(!closed.get()) { ConsumerRecords records = consumer.poll(1000); for(ConsumerRecord record: records) { //一组consumer的时候每个partition对应的线程是固定的 System.out.println("Thread-Name:"+Thread.currentThread().getName()+" "+"partition:"+record.partition()+" "+record.value()); } } }catch(WakeupException e ) { if(!closed.get()) throw e; }finally { consumer.close(); } } public void shutdown() { closed.set(true); consumer.wakeup(); }} package kafka.demo;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.kafka.clients.consumer.KafkaConsumer;public class MutiConsumerTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); props.put("group.id", "group_test7"); //配置topic的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //自动同步offset props.put("enable.auto.commit","true"); //自动同步offset的时间间隔 props.put("auto.commit.intervals.ms", "2000"); //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest props.put("auto.offset.reset", "earliest "); String topic = "test7"; List consumers = new ArrayList<>(); ExecutorService es = Executors.newFixedThreadPool(3); for(int i = 0 ;i<=2;i++) { KafkaConsumer consumer = new KafkaConsumer(props); MutilConsumerThread cThread = new MutilConsumerThread(consumer,Arrays.asList(topic)); consumers.add(cThread); es.submit(cThread); } //Thread.sleep(1000L); /* 这个方法的意思就是在JVM中增加一个关闭的钩子,当JVM关闭的时候, 会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后, JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁等操作。*/ Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for(MutilConsumerThread consumer :consumers ) { consumer.shutdown(); } } }); }} 看完上述内容,你们掌握如何解析Kafka 1.0.0 多消费者示例的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
配置
序列
方法
时候
钩子
消费
意味
消费者
示例
内容
就是
持久性
更多
系统
问题
同步
最低
最小
成功
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
家和网络技术有限公司
当你在服务器里有很多钻石时
北京心诚志远网络技术有限公司
数据库在云服务器的作用
小程序天津网络技术公司
网络安全链条app
网络安全硬件平台厂商
深圳布塔网络技术有限公司
专科软件开发工程师
佛山市永天网络技术有限公司
atm网络技术论文
超玩我的世界服务器
database数据库报错
青海办理网络技术转让的公司
天津必火网络安全
星空夺宝软件开发
网页上面数据库怎么下载
四川管理软件开发公司
软件开发用户数据
网络安全2022任子行
影视软件开发基础
c 在窗口之间传递数据库
数据库的父类
海康服务器
手机屏幕维修后无法连接服务器
ftp服务器的设置与管理
西游记网络安全宣传
什么是软件开发模型设计
应用软件开发工具官方最新版
计算机网络技术公开课课件