千家信息网

SpringBoot Kafka 整合的使用方法

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"SpringBoot Kafka 整合的使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"SpringBoot Kafka 整
千家信息网最后更新 2025年12月02日SpringBoot Kafka 整合的使用方法

这篇文章主要讲解了"SpringBoot Kafka 整合的使用方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"SpringBoot Kafka 整合的使用方法"吧!

创建项目

项目整体架构:

使用 IDEA 创建 SpringBoot 项目,这个很简单了,这里不做过多的讲解。

1、pom 文件代码如下:

    4.0.0    com.zhisheng    kafka-learning    0.0.1-SNAPSHOT    jar    kafka-learning    Demo project for Spring Boot + kafka            org.springframework.boot        spring-boot-starter-parent        1.5.9.RELEASE                         UTF-8        UTF-8        1.8                            org.springframework.boot            spring-boot-starter-web                            org.projectlombok            lombok            true                            org.springframework.boot            spring-boot-starter-test            test                            org.springframework.kafka            spring-kafka            1.1.1.RELEASE                            com.google.code.gson            gson            2.8.2                                                    org.springframework.boot                spring-boot-maven-plugin                        

主要引入了 spring-kafka 、lombok 、 gson 依赖。

2、消息实体类 Message.java 如下:

@Datapublic class Message {    private Long id;    //id    private String msg; //消息    private Date sendTime;  //时间戳}

3、消息发送类 KafkaSender.java

@Component@Slf4jpublicclass KafkaSender {        @Autowired        private KafkaTemplate kafkaTemplate;        private Gson gson = new GsonBuilder().create();        //发送消息方法        public void send() {                Message message = new Message();                message.setId(System                                .currentTimeMillis());                message.setMsg(UUID.randomUUID().toString());                message.setSendTime(new Date())                ;                log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));                kafkaTemplate.send                                ("zhisheng", gson.toJson(message));        }}

就这样,发送消息代码就实现了。

这里关键的代码为 kafkaTemplate.send() 方法, zhisheng 是 Kafka 里的 topic ,这个 topic 在 Java 程序中是不需要提前在 Kafka 中设置的,因为它会在发送的时候自动创建你设置的 topic, gson.toJson(message) 是消息内容,这里暂时先说这么多了,不详解了,后面有机会继续把里面源码解读写篇博客出来(因为中途碰到坑,老子跟了几遍源码)。

4、消息接收类 KafkaReceiver.java

@Component@Slf4jpublicclass KafkaReceiver {        @KafkaListener(topics = {"zhisheng"})        public void listen(ConsumerRecord record) {                Optional kafkaMessage = Optional.ofNullable(record.value());                if (kafkaMessage.isPresent()) {                        Object message = kafkaMessage.get();                        log.info("----------------- record =" + record);                        log.info("------------------ message =" + message);                }        }}

客户端 consumer 接收消息特别简单,直接用 @KafkaListener 注解即可,并在监听中设置监听的 topictopics 是一个数组所以是可以绑定多个主题的,上面的代码中修改为 @KafkaListener(topics={"zhisheng","tian"}) 就可以同时监听两个 topic 的消息了。需要注意的是:这里的 topic 需要和消息发送类 KafkaSender.java 中设置的 topic 一致。

5、启动类 KafkaApplication.java

@SpringBootApplicationpublicclass KafkaApplication {        public static void main(String[] args) {                ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);                KafkaSender sender = context.getBean(KafkaSender.class);                for (int i = 0; i < 3; i++) {                                    //调用消息发送类中的消息发送方法                                    sender.send();                        try {                                Thread.sleep(3000);                        } catch (InterruptedException e) {                                e.printStackTrace();                        }                }        }}

6、配置文件 application.properties

#============== kafka ===================# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=192.168.153.135:9092##=============== provider  =======================#spring.kafka.producer.retries=0# 每次批量发送消息的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432## 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer##=============== consumer  =======================# 指定默认消费者group idspring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100## 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.bootstrap-servers 后面设置你安装的 Kafka 的机器 IP 地址和端口号 9092。

如果你只是简单整合下,其他的几个默认就好了。

Kafka 设置

在你安装的 Kafka 目录文件下:

启动 zk

使用安装包中的脚本启动单节点 Zookeeper 实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动 Kafka 服务

使用 kafka-server-start.sh 启动 kafka 服务:

bin/kafka-server-start.sh  config/server.properties

启动成功后!

千万注意:记得将你的虚拟机或者服务器关闭防火墙或者开启 Kafka 的端口 9092。

运行

出现这就代表整合成功了!


我们看下 Kafka 中的 topic 列表就

bin/kafka-topics.sh --list --zookeeper localhost:2181

就会发现刚才我们程序中的 zhisheng 已经自己创建了。

感谢各位的阅读,以上就是"SpringBoot Kafka 整合的使用方法"的内容了,经过本文的学习后,相信大家对SpringBoot Kafka 整合的使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

消息 方法 整合 使用方法 代码 内容 文件 项目 中设 学习 服务 监听 成功 地址 多个 方式 源码 程序 UTF-8 一致 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发技术学习方法 jsp取出数据库数据 网络安全讲座意义 上海造艺网络技术优化 河北erp 软件开发的行业须知 建立网络打印服务器 保障冬奥会网络安全意义 数据库查询时去重的方法 方舟生存进化山海经服务器号推荐 数据库包含三个文件组怎么弄 杭州百视通网络技术有限公司 青鸟吉他谱软件开发 管理软件开发模板 茶树育种学涉及到哪些组学数据库 深圳基石测评网络技术有限公司 购物网站数据库设计代码 查询数据库表中记录为空值 软件开发在哪里可以找到客户 平顶山三年制计算机网络技术 使用网络安全日记 我的世界单方块服务器攻略 2021网络安全厂家排名 杭州百视通网络技术有限公司 北京博齐世纪网络技术服务 软件开发需求撰写 怎样把多台服务器合成一台 疫情中互联网科技公司的创造性 莆田网络安全教育 网站需要的服务器配置 大学生网络安全活动简报
0