消息队列之kafka(API)
发表于:2025-12-04 作者:千家信息网编辑
千家信息网最后更新 2025年12月04日,1.模拟实现kafka的生产者消费者(原生API)解决相关依赖:
千家信息网最后更新 2025年12月04日消息队列之kafka(API)
1.模拟实现kafka的生产者消费者(原生API)
解决相关依赖: org.apache.kafka kafka_2.12 2.1.0 生产者:
packagecom.zy.kafka;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.加载配置文件 //1.1封装配置文件对象 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //配置ack级别:0 1 -1(all) prps.put("acks", "all"); //重试次数 prps.put("retries", 3); prps.put("batch.size", 16384); prps.put("linger.ms",1); prps.put("buffer.memory", 33554432); //指定(message的K-V)的序列化 prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.创建生产者对象(指定的key和value的泛型) Producerproducer=new KafkaProducer<>(prps); //生产者发送消息 for(inti=0;i<100;i++) { /** * ProducerRecord(topic, value) * topic:主题名称 * key: * value: */ //消息的封装对象 ProducerRecordpr=newProducerRecord("test_topic", "key"+i, "value"+i); producer.send(pr); }producer.close(); }} 消费者:
packagecom.zy.kafka;importjava.util.Arrays;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.加载配置文件 //1.1封装配置文件对象 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //指定消费的组的ID prps.put("group.id", "test"); //是否启动自动提交(是否自动提交反馈信息,向zookeeper提交) prps.put("enable.auto.commit", "true"); //自动提交的时间间隔 prps.put("auto.commit.interval.ms", "1000"); //指定(message的K-V)的序列化 prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建kafka的消费者 KafkaConsumerconsumer=newKafkaConsumer<>(prps); //添加消费主题 consumer.subscribe(Arrays.asList("kafka_test")); //开始消费 while(true) { //设置从哪里开始消费,返回的是一个消费记录 ConsumerRecordspoll = consumer.poll(10); for(ConsumerRecordp:poll) { System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value()); } } }} 2.以shell命令的方式API
import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import kafka.admin.TopicCommand;public class KafkaAPI { public static void main(String[] args) throws IOException { /* kafka-topics.sh \ --create \ --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \ --replication-factor 3 \ --partitions 10 \ --topic kafka_test11 */ //创建一个topic String ops[]=new String []{ "--create", "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181", "--replication-factor","3", "--topic","zy_topic","--partitions","5" }; String list[]=new String[] { "--list", "--zookeeper", "hadoop01:2181,hadoop02:2181,hadoop03:2181" }; //以命令的方式提交 TopicCommand.main(list); }}3. 高级API操作
shell中常用操作:
#!/usr/bin/env bash#查看kafka的topickafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181#查看kafkatopic的偏移量kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_api_r1p1#创建topickafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 1 --topic kafka_api_r1p3#删除topickafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic act_inventory_r1p1_test1#查看具体的group 的偏移量kafka-consumer-groups.sh ①简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费)
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/9 * * Time: 19:44 * * Description: 简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费) */public class MyConsumer01 { private static Properties props = new Properties(); static { props.put("group.id", "kafka_api_group_2"); //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //手动提交偏移量 //props.put("enable.auto.commit", "false"); //设置自动提交时间 props.put("auto.commit.interval.ms", "100"); //设置消费方式 props.put("auto.offset.reset","earliest"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) throws InterruptedException { String topic = "kafka_api_r1p1"; //实例化一个消费者 KafkaConsumer consumer = new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题// consumer.subscribe(Collections.singleton(topic)); consumer.subscribe(Arrays.asList(topic)); //死循环不停的从broker中拿数据 while(true){ ConsumerRecords records = consumer.poll(10); for(ConsumerRecord record : records){ System.out.printf("offset=%d,key=%s,value=%s",record.offset(), record.key(),record.value()); } Thread.sleep(2000); } //consumer.commitAsync(); 提交偏移量信息 }} ②实现多分区消费
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 8:55 * * Description: 实现多分区消费 */public class MyConsumer02 { private static Properties props = new Properties(); static{ //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "kafka_api_group_1"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) { String topicName="kafka_api_r1p3"; //实例化一个消费者 KafkaConsumer consumer =new KafkaConsumer<>(props); //消费者订阅主题,可以订阅多个主题 consumer.subscribe(Arrays.asList(topicName)); while(true){ ConsumerRecords records = consumer.poll(Long.MAX_VALUE); //获取每个分区的数据 for(TopicPartition partition :records.partitions()){ System.out.println("开始消费第"+partition.partition()+"分区数据!"); List> partitionRecords = records.records(partition); //获取每个分区里的records for(ConsumerRecord partitionRecord:partitionRecords){ System.out.println("partition:"+partition.partition()+",key:"+partitionRecord.key()+",value" +partitionRecord.value()+",offset:"+partitionRecord.offset()); } //更新每个分区的偏移量(取分区中最后一个record的偏移量,就是这个分区的偏移量) long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset(); consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset +1))); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }} ③实现消费者从指定分区拉取数据
注意:
(1)kafka提供的消费者组内的协调功能就不再有效
(2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.Collections;import java.util.List;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 10:10 * * Description: 消费者从指定分区拉取数据 * 一旦指定特定的分区消费需要注意: * (1)kafka提供的消费者组内的协调功能就不再有效 * (2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复 */public class MyConsumer03 { private static Properties props = new Properties(); //实例化一个消费者 static KafkaConsumer consumer; static { //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "kafka_api_group_1"); //开启offset自动提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); } public static void main(String[] args) { //消费者订阅主题,并设置要拉取的分区 String topic="kafka_api_r1p3"; int partitionNum=0; //消费者订阅主题,并设置要拉取的分区 TopicPartition partition0 =new TopicPartition(topic,partitionNum); consumer.assign(Arrays.asList(partition0)); while(true){ ConsumerRecords records = consumer.poll(Long.MAX_VALUE); for(TopicPartition partition : records.partitions()){ List> partitionRecords = records.records(partition); for(ConsumerRecord partitionRecord:partitionRecords){ System.out.println("分区:"+partitionRecord.partition()+",key:"+partitionRecord.key()+",value:" +partitionRecord.value()+"offset:"+partitionRecord.offset()); } long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset(); consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1))); } } }} ④重置kafka组的offset
import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;import java.util.Properties;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 9:46 * * Description: 该API用于重置kafka组的offset */public class ReSetOffset { //用于重置的offset final private static String group="kafka_api_group_1"; final private static Properties props = new Properties(); static KafkaConsumer consumer; static{ props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); props.put("group.id",group); props.put("enable.auto.commit", "true"); //props.put("auto.offset.reset","earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer=new KafkaConsumer(props); } public static String resetOffset(String topic,long offset){ int partitionNums=getTopicPartitionNum(topic); for(int i=0;i(props); consumer_temp.assign(Arrays.asList(tp)); consumer_temp.seek(tp,offset); consumer_temp.close(); } consumer.close(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss "); return dateFormat.format(new Date())+ group +" ResetOffset Succeed!!"; } private static int getTopicPartitionNum(String topic){ int partitionNums=consumer.partitionsFor(topic).size(); return partitionNums; } public static void main(String[] args) { String topic="kafka_api_r1p1"; System.out.println(ReSetOffset.resetOffset(topic,0)); }} ⑤多线程版本的消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;import java.util.Arrays;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicBoolean;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 10:45 * * Description: 这是一个consumer的线程 */public class ConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; private final CountDownLatch latch; public ConsumerRunner(KafkaConsumer consumer, CountDownLatch latch) { this.consumer = consumer; this.latch = latch; } @Override public void run() { System.out.println("threadName....." + Thread.currentThread().getName()); try { consumer.subscribe(Arrays.asList("kafka_api_r1p1")); while (!closed.get()) { ConsumerRecords records = consumer.poll(150); for (ConsumerRecord record : records) System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value()); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } catch (WakeupException e) { if(!closed.get()){ throw e; } }finally { consumer.close(); latch.countDown(); } } public void shutdown(){ System.out.println("close ConsumerRunner"); closed.set(true); consumer.wakeup(); }} import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;import java.util.List;import java.util.Properties;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** * * Created with IntelliJ IDEA. * * User: ZZY * * Date: 2019/9/10 * * Time: 10:52 * * Description: 这里主要测试多线程下的Consumer */public class RunConsumer { private static Properties props = new Properties(); static{ //设置kafka集群的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置消费者组,组名字自定义,组名字相同的消费者在一个组 props.put("group.id", "kafka_api_group_1"); //开启offset自动提交 props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) { //实例化一个消费者 final List consumers = new ArrayList<>(); final List> kafkaConsumers = new ArrayList<>(); for(int i=0;i<2;i++){ kafkaConsumers.add(new KafkaConsumer(props)); } //倒计时,利用await方法使主线程阻塞,利用countDown递减,当递减到0时,唤醒主线程,功能类似于join final CountDownLatch latch = new CountDownLatch(2); ExecutorService executor = Executors.newFixedThreadPool(2); for(int i=0;i<2;i++){ ConsumerRunner c= new ConsumerRunner(kafkaConsumers.get(i),latch); consumers.add(c); executor.submit(c); } /** * 这个方法的意思就是在jvm中增加一个关闭的钩子,当JVM关闭时,会执行系统中已经设置的所有 * 方法addShutdownHook添加的钩子,当系统执行完成这些钩子后,jvm才会关闭, * 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作。 */ Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { System.out.println("...................."); for(ConsumerRunner consumer:consumers){ consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MICROSECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }}
消费
消费者
偏移
主题
配置
名字
地址
实例
序列
订阅
相同
对象
数据
线程
文件
生产者
钩子
集群
生产
功能
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
建拓网络技术有限公司
微信无法连接到服务器检查网络
fortran 数据库
51全栈网络安全专家
仿真计算服务器
vs2010 自带数据库
根据网络安全策略及服务器
武汉市科尔软件开发
ftp服务器不显示文件夹怎么办
国泰安数据库到导出托宾Q值
服务器管理实验报告
浙江节能刀片服务器定制
r7525服务器现货价格
服务器搭建与管理.pdf
数据库恢复及调试技术
媒体服务器软件
云服务器如何删除硬盘
网络安全宣传设备
利用大型数据库赚钱
黑客先学c语言还是网络安全
国家负责网络安全工作的部门
汕头无限软件开发报价表
服务器常见的安全风险有哪些
js 连接数据库代码
高并发更新数据库
c语言数据库
详述数据库的概念
计算机网络技术考点解读
英灵神殿mod服务器
cas认证服务器