Springboot如何集成Kafka进行批量消费
发表于:2025-11-08 作者:千家信息网编辑
千家信息网最后更新 2025年11月08日,本篇内容主要讲解"Springboot如何集成Kafka进行批量消费",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Springboot如何集成Kafka进
千家信息网最后更新 2025年11月08日Springboot如何集成Kafka进行批量消费
本篇内容主要讲解"Springboot如何集成Kafka进行批量消费",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Springboot如何集成Kafka进行批量消费"吧!
引入依赖
org.springframework.kafka spring-kafka 1.3.11.RELEASE
因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。

注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
创建配置类
/** * kafka 配置类 */ @Configuration @EnableKafka public class KafkaConsumerConfig { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); @Value("${kafka.bootstrap.servers}") private String kafkaBootstrapServers; @Value("${kafka.group.id}") private String kafkaGroupId; @Value("${kafka.topic}") private String kafkaTopic; public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf"; public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks"; @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 设置并发量,小于或者等于 Topic 的分区数 factory.setConcurrency(5); // 设置为批量监听 factory.setBatchListener(Boolean.TRUE); factory.getContainerProperties().setPollTimeout(30000); return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map consumerConfigs() { Map props = new HashMap<>(); //设置接入点,请通过控制台获取对应Topic的接入点。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); //设置SSL根证书的路径,请记得将XXX修改为自己的路径。 //与SASL路径类似,该文件也不能被打包到jar中。 System.setProperty("java.security.auth.login.config", CONFIG_PATH); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); //根证书存储的密码,保持不变。 props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); //接入协议,目前支持使用SASL_SSL协议接入。 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); //SASL鉴权方式,保持不变。 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // 自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); //两次Poll之间的最大允许间隔。 //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); //设置单次拉取的量,走公网访问时,该参数会有较大影响。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); //每次Poll的最大数量。 //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //当前消费实例所属的消费组,请在控制台申请之后填写。 //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); //Hostname校验改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); return props; } } 注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。
Kafka 消费者
/** * kafka 消息消费类 */ @Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); @KafkaListener(topics = {"${kafka.topic}"}) public void listen(List> recordList) { for (ConsumerRecord record : recordList) { // 打印消息的分区以及偏移量 LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset()); String value = record.value(); System.out.println("value = " + value); // 处理业务逻辑 ... } } } 因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List
到此,相信大家对"Springboot如何集成Kafka进行批量消费"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
消费
消息
消费者
接入
配置
版本
路径
最大
内容
实例
接入点
控制台
方式
方法
线程
网站
证书
项目
分配
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
疯狂大饭店 访问服务器失败
tbc联盟哪个服务器比较好
青海网络安全事件应急演练
睡眠监测仪软件开发设备清单表
网络安全的可用性是什么
北京鑫宝源互联网科技公司官网
网络安全你我他绘画
怎么用控制台登录5e服务器
苏州电商软件开发产品介绍
魔兽世界正式服阿拉希服务器
mc服务器平台
软件开发公司没有进项
数据库所采用的安全措施
如何只输入服务器的ip
工作流都可以用什么软件开发
x86服务器最多能用几个cpu
广西犀牛互联网科技有限公司
存储服务器配比
河南创元网络技术
什么是统一软件开发过程
服务器2008什么时候发布
电子邮件服务器填充
罗湖网络安全定制
公益网络安全宣传官有啥用
数据库字符集设置语句
自动备份还原数据库
软件开发中的软件详细计划
ios 数据库使用注意
服务器excel推荐
软件开发599.99啥意思