如何用MQTT协议实现消息的订阅接收?
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。操作步骤:引入相关的依赖 org.springf
千家信息网最后更新 2025年12月03日如何用MQTT协议实现消息的订阅接收?
MQTT协议因低延迟、效率高在工业物联网领域使用的频率特别高。前文介绍了如何用代码发送MQTT消息,本文在前文的基础上实现MQTT消息的订阅接收。
操作步骤:
- 引入相关的依赖
org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-mqtt org.projectlombok lombok true - 在application.yml配置MQTT服务器信息
server: port: 8090mqtt: host: tcp://127.0.0.1:1883 clientinid: mqttinId clientoutid: mqttoutid topic: virus qoslevel: 1 #MQTT 认证 username: xxx password: xxx timeout: 10000 #20s keepalive: 20- 配置MQTT消息推送配置
package com.favccxx.mqtt.config;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;@Slf4j@Configuration@IntegrationComponentScanpublic class MQTTReceiveConfig { @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.host}") private String hostUrl; @Value("${mqtt.clientinid}") private String clientId; @Value("${mqtt.topic}") private String defaultTopic; @Value("${mqtt.timeout}") private int completionTimeout ; //连接超时 @Bean public MqttConnectOptions getReceiverMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptions()); return factory; } //接收通道 @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(), defaultTopic); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message> message) throws MessagingException { log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload()); } }; }}
消息
配置
数据
通道
前文
订阅
主题
代码
信息
基础
工业
效率
服务器
步骤
领域
频率
上实
延迟
推送
服务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
观摩网络安全竞赛
观安杯网络安全技能竞赛
文件服务器流量控制
有什么物流仿真软件开发
这样批量查找相同的数据库
宜宾展厅互动软件开发公司
中电十五所网络安全
哈尔滨旋律互联网科技骗
数据库关系与关系之间的联系
链接显示说网络安全密码
电脑服务器地址登录不上去
信创数据库技术
网贷是网络安全
通信工程做软件开发
软件开发认证体系
老鼠服务器
数据库怎样分页
域名转换工作的服务器是什么
做棋牌需要哪些服务器
数据库组合键
软件开发部门制度
北京弘建网络技术有限公司
将网络安全工作纳入
共享自动娃娃机软件开发
联想服务器管理平台
ipad建立服务器
服务器镜像安装
互联网加科技的股票是哪些
元器件数据库和物料库
数据库更新动态ip