如何进行kafka批量消费多消费者问题分析
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。package com.llw.m
千家信息网最后更新 2025年12月01日如何进行kafka批量消费多消费者问题分析
今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record =" + record); System.out.println("----------------- message =" + message); } }@KafkaListener(id = "2", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) })public void listen2(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 1=" + record); System.out.println("------------------ message 1=" + message); } }//id = "4", //id="4" @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )*//* },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 4=" + record);// System.out.println("------------------ message 4=" + message); } } } finally {// ack.acknowledge(); } }//id="5" @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) },*/ containerFactory = "kafkaBatchListener6")public void listen2(List> records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 6=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }//https://www.cnblogs.com/linjiqin/p/13171789.html @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) }, */containerFactory = "kafkaBatchListener6")public void listen4(List> records) {try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 3=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }} 一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,
spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。
看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
消费
消费者
问题
内容
模式
费多
分析
注解
参数
多个
数据
更多
最好
消息
知识
篇文章
行业
资讯
资讯频道
队列
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全实施办法 党委
数钥网络技术有限公司陈小姐
a10网络安全信息活动简报图片
sql数据库表单满了怎么解决
cswp考试老是无法连接服务器
虚拟服务器的特性和优点
防网络安全诈骗安全
数据库中实体关系图
天水市网络安全管理
软件开发初中文化难学吗
专升本数据库管理系统知识点
虹口区机电软件开发口碑推荐
美西方面临的网络安全风险
百度智能云服务器续费
软件开发部主管的QA职责
青岛信息城服务器组装
delphi数据库操作
网络工程(网络安全方向)
服务器开机输入管理员密码
传奇ip地址和服务器名称
2022年软件开发岗位需求量
戴尔服务器自检一般多久
企业各部门怎样共享一个数据库
空间数据库文件管理模式
购买网络安全设备的申请
网络安全板报字
如何用服务器管理所有电脑
服务器文件查询管理系统
怎样查看酒店服务器的lp地址
美国5g网络技术怎么样