千家信息网

如何理解Python MQTT异步框架HBMQTT

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,如何理解Python MQTT异步框架HBMQTT,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。什么是异步CPU 的速度远远快于磁盘
千家信息网最后更新 2025年12月01日如何理解Python MQTT异步框架HBMQTT

如何理解Python MQTT异步框架HBMQTT,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

什么是异步

CPU 的速度远远快于磁盘、网络等 IO 操作,而在一个线程中,无论 CPU 执行得再快,遇到 IO 操作时,都得停下来等待读写完成,这无疑浪费了许多时间。

为了解决这个问题,Python 加入了异步 IO 的特性。在 Python 3.4 中,正式将 asyncio 纳入标准库中,并在 Python 3.5 中,加入了 async/await 关键字。用户可以很轻松的使用在函数前加入 async 关键字,使函数变成异步函数。

在 Python 的 MQTT 客户端库中,HBMQTT 是最早支持异步 IO 的 Python MQTT 库。

HBMQTT 库

HBMQTT 是基于 Python 编写的开源库,实现了 MQTT 3.1.1 协议,特性如下:

  • 支持 QoS 0, QoS 1 以及 QoS 2 消息

  • 客户端自动重连

  • 支持 TCP 和 WebSocket

  • 支持 SSL

  • 支持插件系统

下面我们将演示如何使用 Python MQTT 异步框架 - HBMQTT,轻松实现一个具备 MQTT 发布、订阅功能的异步 Demo。

项目初始化

确定 Python 版本

本项目使用 Python 3.6 进行开发测试,读者可用如下命令确认 Python 的版本。

因为需要使用 async 关键字,需要确保 Python 版本不低于 Python 3.5

➜  ~ python3 --versionPython 3.6.7

使用 Pip 安装 HBMQTT 库

Pip 是 Python 的包管理工具,该工具提供了对 Python 包的查找、下载、安装和卸载功能。

pip3 install -i https://pypi.doubanio.com/simple hbmqtt

连接 MQTT 服务器

本文将使用 EMQ X 提供的免费公共 MQTT 服务器,该服务基于 EMQ X 的MQTT 物联网云平台创建。服务器接入信息如下:

  • Broker: broker.emqx.io

  • TCP Port: 1883

  • Websocket Port: 8083

首先,导入 MQTT 客户端库。

from hbmqtt.client import MQTTClientclient = MQTTClient()# 连接服务器client.connect('mqtt://broker.emqx.io/')# 断开连接client.disconnect()

异步写法如下:

async def test_pub():    client = MQTTClient()    await client.connect('mqtt://broker.emqx.io/')    await client.disconnect()

发布消息

发布消息函数为 MQTTClient 类的 publish 函数。

client = MQTTClient()# 函数的三个参数分别为主题、消息内容、QoSclient.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)

异步写法如下:

async def test_pub():    client = MQTTClient()    await Client.connect('mqtt://broker.emqx.io/')    await asyncio.gather(        client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)    )    logging.info("messages published")    await Client.disconnect()

在这段代码中,我们将三个发送消息函数放进 asyncio 的任务列表里,它们将会依次被运行。当所有任务都完成后,断开连接。

订阅消息

定阅消息函数为 MQTTClient 类中的 subscribe 函数。

client = MQTTClient()# 订阅client.subscribe([  ('topic/0', QOS_0),  ('topic/1', QOS_1),  ])# 取消订阅client.unsubscribe([  ('topic/0', QOS_0),]

异步写法如下:

async def test_sub():    client = MQTTClient()    await client.connect('mqtt://broker.emqx.io/')    await client.subscribe([            ('a/b', QOS_1),         ])    for i in range(0, 10):        message = await client.deliver_message()        packet = message.publish_packet        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")    await client.disconnect()

在这段代码中,我们在接收消息时设置了 await 等待,当代码执行到如下位置时,CPU 会先去执行其它任务,直到有消息传达,再将其打印。

message = await client.deliver_message()

最终,程序会等待 10 次消息接收,然后关闭连接。

完整代码

消息订阅代码

# sub.py# python 3.6+import asyncioimport loggingfrom hbmqtt.client import MQTTClientfrom hbmqtt.mqtt.constants import QOS_1async def test_sub():    client = MQTTClient()    await client.connect('mqtt://broker.emqx.io/')    await client.subscribe([        ('a/b', QOS_1),    ])    for i in range(0, 10):        message = await client.deliver_message()        packet = message.publish_packet        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")    await client.disconnect()if __name__ == '__main__':    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"    logging.basicConfig(level=logging.INFO, format=formatter)    asyncio.run(test_sub())

消息发布代码

# pub.py# python 3.6+import asyncioimport loggingfrom hbmqtt.client import MQTTClientfrom hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2async def test_pub():    client = MQTTClient()    await client.connect('mqtt://broker.emqx.io/')    await asyncio.gather(        client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)    )    logging.info("messages published")    await client.disconnect()if __name__ == '__main__':    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"    logging.basicConfig(level=logging.INFO, format=formatter)    asyncio.run(test_pub())

测试

消息发布

运行 MQTT 消息发布代码,我们将看到客户端连接成功,并且成功发布消息。

如下为 MQTT X 客户端成功接收到 HBMQTT 客户端发布的消息:

消息订阅

运行 MQTT 消息订阅代码,我们将看到客户端连接成功,此时客户端正在等待消息进入

使用 MQTT X 客户端连接 broker.emqx.io,然后向主题 a/b 发送 10 次消息

回到终端,我们看到客户端接收并打印消息,并且在收到 10 条消息后,主动退出了程序。

至此,我们完成了 HBMQTT 库连接到公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。通过使用 Python 异步 IO 执行消息的发送接收,可以帮助我们实现更加高效的 MQTT 客户端。

关于如何理解Python MQTT异步框架HBMQTT问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

消息 客户 客户端 函数 代码 订阅 服务 服务器 支持 成功 问题 框架 任务 关键 关键字 写法 帮助 测试 运行 三个 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全学院的师资力量 软件开发 付款周期 海安无线网络技术在线咨询 让4g网络安全的策略 深圳市牛班网络技术有限公司 农业银行软件开发中心下载 幼儿园网络安全知识感触 江西家用软件开发单价 sql还原数据库失败 浙江常用软件开发现价 西南民族大学数据库原理考试 使命召唤ol服务器连接失灵 如何设计小说的数据库表 白山市网络安全培训班 网络安全与安防 加强网络安全防范学校 计算机网络技术服务宣传语 西安做软件开发外包的外资企业 学生评估平台软件开发 网络安全法明确国家实行什么战略 北京工业软件开发哪家专业 深圳市力源网络技术有限公司 网络技术应用教学的调研报告 数据库设计方案模板 实验用mpp数据库有哪些 分布式数据库中的所有权限 虹口区app软件开发定做价格 微信转发谣言 网络安全 数据库恢复技术导出表 智能手表软件开发流程
0