如何进行Kafka 1.0.0 d代码示例分析
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。package kafka.d
千家信息网最后更新 2025年12月03日如何进行Kafka 1.0.0 d代码示例分析
这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
package kafka.demo;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;/** * *Description: kafka 1.0.0
* @author guangshihao * @date 2018年9月19日 * */public class KafkaProduderDemo { public static void main(String[] args) { Mapprops = new HashMap<>(); /* * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 */ props.put("acks", "1"); //配置默认的分区方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * kafka broker对应的主机,格式为host1:port1,host2:port2 */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord record = new ProducerRecord (topic,line ); producer.send(record); } producer.close(); } }//---------------------------------------------------------------------------------------------------------------------------package kafka.demo;import java.util.Arrays;import java.util.Properties;import java.util.Random;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;public class KafkaConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");props.put("group.id", "group_test7");//配置topic的序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//配置value的序列化类props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//自动同步offset props.put("enable.auto.commit","true"); //自动同步offset的时间间隔 props.put("auto.commit.intervals.ms", "2000"); //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest props.put("auto.offset.reset", "earliest "); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test7"));//consumer.beginningOffsets("");try {while(true) {ConsumerRecords records = consumer.poll(1000);for(ConsumerRecord record: records) {System.out.println("partition:"+record.partition()+" "+record.value());}//consumer.commitSync();if((new Random(10)).nextInt()>5) {consumer.wakeup();}}}catch(WakeupException e) {e.printStackTrace();}finally {consumer.close();}}}
关于如何进行Kafka 1.0.0 d代码示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
数据
配置
序列
意味
代码
示例
分析
内容
持久性
文章
更多
知识
篇文章
同步
不错
最低
最小
成功
三个
主机
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
速可网络安全ppt
青岛直播软件开发机构
2018网络安全知识竞赛题
mongo 显示数据库
网络技术专业职业岗位
基因基础表达量 数据库
服务器和群晖nas冲突吗
最低价网约车打车软件开发
雷霆加速器的服务器地址
计算机网络安全选修课
三星软件开发者中心
尔雅网课计算机网络技术
简单的数据库管理系统设计
linux管理服务器命令
网络安全 第五空间全集
remium同步数据库
oracle添加数据库
生物信息三大数据库收录数据情况
网络安全生产红头文件
表格加密怎么导出数据库
信息化及网络安全总结
联想软件开发工程师 面试
数据库名称不一样登录不了
数据库原理与数据库命令是什么
东莞信息软件开发商家
明日之后白树高地部落服务器
不出名的软件开发公司
软件开发合同交什么税
服务器电源如何识别电压
网络安全视频下载