千家信息网

如何进行kafka批量消费多消费者问题分析

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。package com.llw.m
千家信息网最后更新 2025年12月01日如何进行kafka批量消费多消费者问题分析

今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord record) {        Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            System.out.println("----------------- record =" + record);            System.out.println("----------------- message =" + message);        }    }@KafkaListener(id = "2", topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            })public void listen2(ConsumerRecord record) {        Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            System.out.println("----------------- record 1=" + record);            System.out.println("------------------ message 1=" + message);        }    }//id = "4",    //id="4"    @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )*//*            },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack        try {for (ConsumerRecord record : records) {                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {                    Object message = kafkaMessage.get();                    System.out.println("----------------- record 4=" + record);//   System.out.println("------------------ message 4=" + message);                }            }        } finally {//   ack.acknowledge();        }    }//id="5"    @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            },*/ containerFactory = "kafkaBatchListener6")public void listen2(List> records) {//, Acknowledgment ack        try {for (ConsumerRecord record : records) {                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {                    Object message = kafkaMessage.get();                    System.out.println("----------------- record 6=" + record);//   System.out.println("------------------ message 6=" + message);                }            }        } finally {//   ack.acknowledge();        }    }//https://www.cnblogs.com/linjiqin/p/13171789.html    @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            }, */containerFactory = "kafkaBatchListener6")public void listen4(List> records) {try {for (ConsumerRecord record : records) {                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {                    Object message = kafkaMessage.get();                    System.out.println("----------------- record 3=" + record);//   System.out.println("------------------ message 6=" + message);                }            }        } finally {//   ack.acknowledge();        }    }}

一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,

spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。

看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

消费 消费者 问题 内容 模式 费多 分析 注解 参数 多个 数据 更多 最好 消息 知识 篇文章 行业 资讯 资讯频道 队列 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全实施办法 党委 数钥网络技术有限公司陈小姐 a10网络安全信息活动简报图片 sql数据库表单满了怎么解决 cswp考试老是无法连接服务器 虚拟服务器的特性和优点 防网络安全诈骗安全 数据库中实体关系图 天水市网络安全管理 软件开发初中文化难学吗 专升本数据库管理系统知识点 虹口区机电软件开发口碑推荐 美西方面临的网络安全风险 百度智能云服务器续费 软件开发部主管的QA职责 青岛信息城服务器组装 delphi数据库操作 网络工程(网络安全方向) 服务器开机输入管理员密码 传奇ip地址和服务器名称 2022年软件开发岗位需求量 戴尔服务器自检一般多久 企业各部门怎样共享一个数据库 空间数据库文件管理模式 购买网络安全设备的申请 网络安全板报字 如何用服务器管理所有电脑 服务器文件查询管理系统 怎样查看酒店服务器的lp地址 美国5g网络技术怎么样
0