Springboot 2.x集成kafka 2.2.0的方法
发表于:2025-11-08 作者:千家信息网编辑
千家信息网最后更新 2025年11月08日,本文小编为大家详细介绍"Springboot 2.x集成kafka 2.2.0的方法",内容详细,步骤清晰,细节处理妥当,希望这篇"Springboot 2.x集成kafka 2.2.0的方法"文章能
千家信息网最后更新 2025年11月08日Springboot 2.x集成kafka 2.2.0的方法
本文小编为大家详细介绍"Springboot 2.x集成kafka 2.2.0的方法",内容详细,步骤清晰,细节处理妥当,希望这篇"Springboot 2.x集成kafka 2.2.0的方法"文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
引言
kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。
基本环境
springboot版本2.1.4
kafka版本2.2.0
jdk 1.8
代码编写
1、基本引用pom
4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.4.RELEASE com.example demo 0.0.1-SNAPSHOT kafkademo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web mysql mysql-connector-java runtime org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka 2.2.0.RELEASE com.google.code.gson gson 2.7 org.springframework.boot spring-boot-maven-plugin
2、基本配置
spring.kafka.bootstrap-servers=2.1.1.1:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #logging.level.root=debug
3、实体类
package com.example.demo.model; import java.util.Date; public class Messages { private Long id; private String msg; private Date sendTime; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; }}4、生产者端
package com.example.demo.service; import com.example.demo.model.Messages;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture; import java.util.Date;import java.util.UUID; @Servicepublic class KafkaSender { @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send() { Messages message = new Messages(); message.setId(System.currentTimeMillis()); message.setMsg("123"); message.setSendTime(new Date()); ListenableFuture> test0 = kafkaTemplate.send("newtopic", gson.toJson(message)); }} 5、消费者
package com.example.demo.service; import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service; import java.util.Optional; @Servicepublic class KafkaReceiver { @KafkaListener(topics = {"newtopic"}) public void listen(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("record =" + record); System.out.println("message =" + message); } } }6、测试
在启动方法中模拟消息生产者,向kafka中发送消息
package com.example.demo; import com.example.demo.service.KafkaSender;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplicationpublic class KafkademoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i <1000; i++) { sender.send(); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } }效果展示
命令行直接消费消息
遇到的问题
生产端连接kafka超时
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
解决方案:
修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka
读到这里,这篇"Springboot 2.x集成kafka 2.2.0的方法"文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注行业资讯频道。
方法
配置
文章
消息
版本
生产
内容
生产者
消费
妥当
代码
企业
信心
命令
地方
基本配置
实体
引言
形式
思路
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
梅州虚拟服务器管理软件
无限小说软件开发
网络安全学习心得1000字
delphi动态连接数据库
东莞游戏软件开发
网信办网络安全调研文章
计算机做软件开发考研
文明6无法连接到服务器
当旅游遇上科技互联网
edusoho数据库配置
我的世界服务器被别人贴木牌
软件开发公司行业资质
免费的中文数据库有哪些
tnt放在服务器上会怎么样
普通电脑做云服务器
软件开发tc版本
云数据库的建立
计算机网络安全的产生和发展
牟平区软件开发
数据库如何重复显示两条数据
取消服务器和开机选项
国家安全局网络安全科
数据库的 角色
健身房设计软件开发
数据库表结构的优化
软件开发怎么做需求分析
数据库多对多怎么画
星际数据库.chm
为链接上服务器
服务器的硬件分类