千家信息网

Kafka中怎么通过整合SpringBoot实现消息发送与消费

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Kafka中怎么通过整合SpringBoot实现消息发送与消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。kafka和zookeepe
千家信息网最后更新 2025年12月03日Kafka中怎么通过整合SpringBoot实现消息发送与消费

Kafka中怎么通过整合SpringBoot实现消息发送与消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

kafka和zookeeper集群前边写过了。如果遇到kakfa说没有连接记得把kafka下logs日志都删除了,重新启动kafka集群再启动springboot服务

zookeeper https://my.oschina.net/u/3730149/blog/3071737kafka https://my.oschina.net/u/3730149/blog/3071754
  • 生产者

maven依赖

        4.0.0        com.gzh.kafka.producer        producer        0.0.1-SNAPSHOT        jar        kafka-producer-master        demo project for kafka producer                        org.springframework.boot                spring-boot-starter-parent                1.5.9.RELEASE                                                 UTF-8                UTF-8                2.1.5.RELEASE                1.8                                                        org.springframework.boot                        spring-boot-starter                                                                        org.springframework.kafka                        spring-kafka                        ${spring-kafka.version}                                                        org.springframework.boot                        spring-boot-starter-web                                                        org.springframework.boot                        spring-boot-starter-test                        test                                                                        org.springframework.kafka                        spring-kafka-test                        ${spring-kafka.version}                        test                                                                        io.springfox                        springfox-swagger2                        2.8.0                                                                        io.springfox                        springfox-swagger-ui                        2.8.0                                                                                                        org.springframework.boot                                spring-boot-maven-plugin                                                

application.properties

server.port=8000spring.application.name=kafka-producer#kafka configurationspring.kafka.producer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#topickafka.app.topic.foo=test20180430

使用Spring Boot发送Spring Kafka消息

SpringKafka提供了使用Producer的KafkaTemplate类发送消息,并提供将数据发送到Kafka主题的高级操作。 提供异步和同步方法,异步方法返回Future。Spring Boot根据application.properties属性文件中配置的属性自动配置并初始化KafkaTemplate。

为了方便测试发送消息,使用了Spring的定时任务,在类上使用@EnableScheduling 注解开启定时任务,通过@Scheduled注解指定发送消息规则。

     package com.gzh.kafka.producer.component;        import org.slf4j.Logger;        import org.slf4j.LoggerFactory;        import org.springframework.beans.factory.annotation.Autowired;        import org.springframework.beans.factory.annotation.Value;        import org.springframework.kafka.core.KafkaTemplate;        import org.springframework.kafka.support.SendResult;        import org.springframework.scheduling.annotation.EnableScheduling;        import org.springframework.scheduling.annotation.Scheduled;        import org.springframework.stereotype.Component;        import org.springframework.util.concurrent.ListenableFuture;        @Component        @EnableScheduling        public class KafkaMessageProducer {                private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageProducer.class);                @Autowired                private KafkaTemplate kafkaTemplate;                @Value("${kafka.app.topic.foo}")                private String topic;                @Scheduled(cron = "00/5 * * * * ?")                public void send() {                        String message = "Hello World---" + System.currentTimeMillis();                        LOG.info("topic="+topic+",message="+message);                        ListenableFuture> future = kafkaTemplate.send(topic, message);                        future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),                                        fail -> LOG.error("KafkaMessageProducer 发送消息失败!"));                }        }

创建消息生产者启动类

   package com.gzh.kafka.producer;        import org.springframework.boot.SpringApplication;        import org.springframework.boot.autoconfigure.SpringBootApplication;        import org.springframework.boot.context.properties.EnableConfigurationProperties;        @SpringBootApplication        @EnableConfigurationProperties        public class KafkaProducerApplication{                public static void main(String[] args) {                        SpringApplication.run(KafkaProducerApplication.class, args);                }        }

至此,Spring Boot整合Spring Kafka消息生产者应用已经整合完毕。启动zookeeper、kafka各个服务器。启动生产者应用,查看消息生产者应用控制台日志,显示发送消息成功!说明整合OK。

也可以用前段web页面请求的方式

      package com.gzh.kafka.producer.service;        import org.slf4j.Logger;        import org.slf4j.LoggerFactory;        import org.springframework.beans.factory.annotation.Autowired;        import org.springframework.beans.factory.annotation.Value;        import org.springframework.kafka.core.KafkaTemplate;        import org.springframework.kafka.support.SendResult;        import org.springframework.stereotype.Service;        import org.springframework.util.concurrent.ListenableFuture;        @Service        public class KafkaMessageSendService {                private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSendService.class);                @Autowired                private KafkaTemplate kafkaTemplate;                @Value("${kafka.app.topic.foo}")                private String topic;                public void send(String message){                        LOG.info("topic="+topic+",message="+message);                        ListenableFuture> future = kafkaTemplate.send(topic, message);                        future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),                                        fail -> LOG.error("KafkaMessageProducer 发送消息失败!"));                }        }

界面请求处理controller类

    package com.gzh.kafka.producer.controller;        import org.springframework.beans.factory.annotation.Autowired;        import org.springframework.http.MediaType;        import org.springframework.web.bind.annotation.RequestMapping;        import org.springframework.web.bind.annotation.RequestMethod;        import org.springframework.web.bind.annotation.RequestParam;        import org.springframework.web.bind.annotation.RestController;        import com.gzh.kafka.producer.service.KafkaMessageSendService;        @RestController        @RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE)        public class KafkaMessageSendController {                @Autowired                private KafkaMessageSendService kafkaMessageSendService;                @RequestMapping(value="/sendMessage",method=RequestMethod.POST)                public String send(@RequestParam(required=true) String message){                        try {                                kafkaMessageSendService.send(message);                        } catch (Exception e) {                                return "send failed.";                        }                        return message;                }        }

通过Swagger访问测试Controller服务请求

  • 消费者

maven依赖

        4.0.0        com.gzh.kafka.consumer        consumer        0.0.1-SNAPSHOT        jar        kafka-consumer-master        demo project for kafka consumer                        org.springframework.boot                spring-boot-starter-parent                1.5.9.RELEASE                                                 UTF-8                UTF-8                1.3.4.RELEASE                1.8                                                        org.springframework.boot                        spring-boot-starter                                                                        org.springframework.kafka                        spring-kafka                        ${spring-kafka.version}                                                        org.springframework.boot                        spring-boot-starter-web                                                        org.springframework.boot                        spring-boot-starter-test                        test                                                                        org.springframework.kafka                        spring-kafka-test                        ${spring-kafka.version}                        test                                                                                                        org.springframework.boot                                spring-boot-maven-plugin                                                

注意,这是使用Spring-Kafka时一定要注意版本问题,否则会报各种奇葩错误。Spring官方网站上给出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系:

application.properties配置

server.port=8001spring.application.name=kafka-consumer#kafka configuration#指定消息被消费之后自动提交偏移量,以便下次继续消费spring.kafka.consumer.enable-auto-commit=true#指定消息组spring.kafka.consumer.group-id=guan#指定kafka服务器地址spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094#指定从最近地方开始消费(earliest)spring.kafka.consumer.auto-offset-reset=latestspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer#topickafka.app.topic.foo=test20180430

通过使用@KafkaListener来注解一个方法Spring Kafka会自动创建一个消息监听器容器。使用该注解,并指定要消费的topic(也可以指定消费组以及分区号,支持正则表达式匹配),这样,消费者一旦启动,就会监听kafka服务器上的topic,实时进行消费消息。

      package com.gzh.kafka.consumer.service;        import org.slf4j.Logger;        import org.slf4j.LoggerFactory;        import org.springframework.kafka.annotation.KafkaListener;        import org.springframework.messaging.MessageHeaders;        import org.springframework.messaging.handler.annotation.Headers;        import org.springframework.messaging.handler.annotation.Payload;        import org.springframework.stereotype.Component;        @Component        public class KafkaMessageConsumer {                private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);                @KafkaListener(topics={"${kafka.app.topic.foo}"})                public void receive(@Payload String message, @Headers MessageHeaders headers){                        LOG.info("KafkaMessageConsumer 接收到消息:"+message);                        headers.keySet().forEach(key->LOG.info("{}: {}",key,headers.get(key)));                }        }

创建消息消费者启动类

   package com.gzh.kafka.consumer;        import org.springframework.boot.SpringApplication;        import org.springframework.boot.autoconfigure.SpringBootApplication;        import org.springframework.boot.context.properties.EnableConfigurationProperties;        @SpringBootApplication        @EnableConfigurationProperties        public class KafkaConsumerApplication {                public static void main(String[] args) {                        SpringApplication.run(KafkaConsumerApplication.class, args);                }        }

消费者应用已经完成,接下来让我们验证Spring Kafka消息发送和接收效果。先依次启动zookeeper、kafka服务器,然后在启动生产者(kafka-producer-master)应用,再启动消费者(kafka-consumer-master)应用,然后观察生产者和消费者启动类日志: 显示接受消息成功!

看完上述内容,你们掌握Kafka中怎么通过整合SpringBoot实现消息发送与消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0