Python中如何使用matplotlib绘制mqtt数据实时图像功能
发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,这篇文章主要讲解了"Python中如何使用matplotlib绘制mqtt数据实时图像功能",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Python中
千家信息网最后更新 2025年11月07日Python中如何使用matplotlib绘制mqtt数据实时图像功能matplotlib绘制mqtt数据实时图像
这篇文章主要讲解了"Python中如何使用matplotlib绘制mqtt数据实时图像功能",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Python中如何使用matplotlib绘制mqtt数据实时图像功能"吧!
目录
效果图
mqtt发布
mqtt订阅
matplotlib绘制动态图
matplotlib绘制mqtt数据实时图像
效果图

mqtt发布
本代码中publish是一个死循环,数据一直往外发送。
import randomimport timefrom paho.mqtt import client as mqtt_clientimport jsonfrom datetime import datetimebroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt/li"client_id = f'python-mqtt-{random.randint(0, 1000)}' # 随机生成客户端iddef connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return clientdef publish(client): while True: time.sleep(0.01) msg = json.dumps({"MAC": "0123456789", "samplerate": 12, "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]), "battery": 0.5, "acc": [ [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], ]}) result = client.publish(topic, msg) status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}")def run(): client = connect_mqtt() client.loop_start() publish(client)if __name__ == '__main__': run()mqtt订阅
from paho.mqtt import client as mqtt_clientimport timeimport osbroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt/li"def connect_mqtt(client_id): """ MQTT 连接函数。 """ def on_connect(client, userdata, flags, rc): """ 连接回调函数 在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。 """ if rc == 0: print("Connected to MQTT Broker! return code %d" % rc) else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) # client.username_pw_set('uname', 'upwd') # 链接mqtt所需的用户名和密码,没有可不写 client.on_connect = on_connect client.connect(broker , port) return clientdef subscribe(client: mqtt_client, a_topic): """ 订阅消息 """ def on_message(client, userdata, msg): """ 消息回调函数 在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。 * 这里可添加自定义数据处理程序 """ print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode())) client.subscribe(topic) client.on_message = on_messagedef run(client_id, topic): client = connect_mqtt(client_id) subscribe(client, topic) client.loop_forever()if __name__ == '__main__': run('test_eartag-003-python-li', 'zk100/gw/#')matplotlib绘制动态图
import matplotlib.pyplot as pltimport numpy as npcount = 100 # 图中最多数据量ax = list(range(count)) # 保存图1数据ay = [0] * 100bx = list(range(count)) # 保存图2数据by = [0] * 100num = count # 计数plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据plt.rcParams['figure.figsize'] = (10, 10) # 图像显示大小plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文标签乱码,还有通过导入字体文件的方法plt.rcParams['axes.unicode_minus'] = Falseplt.rcParams['lines.linewidth'] = 0.5 # 设置曲线线条宽度plt.tight_layout()while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle("总标题", fontsize=30) # 添加总标题,并设置文字大小 g1 = np.random.random() # 生成随机数画图 # 图表1 ax.append(num) # 追加x坐标值 ay.append(g1) # 追加y坐标值 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子图表标题1') # 添加子标题 agraphic.set_xlabel('x轴', fontsize=10) # 添加轴标签 agraphic.set_ylabel('y轴', fontsize=20) plt.plot(ax[-count:], ay[-count:], 'g-') # 等于agraghic.plot(ax,ay,'g-') # 图表2 bx.append(num) by.append(g1) bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子图表标题2') bgraghic.plot(bx[-count:], by[-count:], 'r^') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 num = num + 1matplotlib绘制mqtt数据实时图像
单线程
先启动mqtt订阅服务
mqtt订阅中有阻塞,更新数据后因订阅服务没有结束,导致绘图程序无法绘图
先启动绘图程序
绘图程序本身也是个循环,拿不到mqtt的实时数据,图像无法更新
两个服务加入协程,也不行。具体原因还不知道,容后补充。
mqtt作为线程启动,可解决上述问题
import jsonimport randomfrom paho.mqtt import client as mqtt_clientimport timeimport datetimefrom math import ceil, floorimport matplotlib.pyplot as pltimport _thread# 公共变量broker = 'broker.emqx.io'topic = "/python/mqtt/li"port = 1883client_id = f'python-mqtt-li-{random.randint(0, 100)}'show_num = 300x_num = [-1] # 计数acc1 = []acc2 = []acc3 = []acc4 = []acc5 = []acc6 = []stime = []"""mqtt subscribe topic"""def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime): """将字符串型【毫秒级】格式化时间 转为 【13位】整型时间戳""" datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f") obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0 return obj_stampdef int2datetime(int_float_timestamp): """ 有小数点:分离小数点,整数转为格式化时间,小数点直接跟在后面 无小数点:从第10位进行分离, 所以本函数只适用于时间戳整数位数大于9且小于11. """ if '.' in str(int_float_timestamp): int_float = str(int_float_timestamp).split('.') date = time.localtime(int(int_float[0])) tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date) secondafter = '.' + str(int_float[1]) return str(tempDate) + secondafterdef parse_mqttmsg(msg): """解析mqt头数据 MAC samplerate sampletime battery acc""" content = json.loads(msg.payload.decode()) span = 1000 / content['samplerate'] * 10 time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000] sampletime = content['sampletime'] sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime) acc = content['acc'] for i in range(len(acc)): x_num.append(x_num[-1] + 1) acc1.append(acc[i][0]) acc2.append(acc[i][1]) acc3.append(acc[i][2]) acc4.append(acc[i][3]) acc5.append(acc[i][4]) acc6.append(acc[i][5]) if i != 0: sampletime_int += time_span[i % 2] stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000)) else: stime.append(sampletime) print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) pass client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return clientdef subscribe(client: mqtt_client): def on_message(client, userdata, msg): # print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") parse_mqttmsg(msg) client.subscribe(topic) client.on_message = on_messagedef run(): client = connect_mqtt() subscribe(client) client.loop_forever()""" draw figures """def draw_figure(): plt.ion() # 开启一个画图的窗口进入交互模式,用于实时更新数据 plt.rcParams['figure.figsize'] = (10, 10) # 图像显示大小 plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文标签乱码,还有通过导入字体文件的方法 plt.rcParams['axes.unicode_minus'] = False plt.rcParams['lines.linewidth'] = 0.5 # 设置曲线线条宽度 count = 0 while True: plt.clf() # 清除刷新前的图表,防止数据量过大消耗内存 plt.suptitle("总标题", fontsize=30) # 添加总标题,并设置文字大小 plt.tight_layout() # 图表1 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子图表标题1') # 添加子标题 agraphic.set_xlabel('x轴', fontsize=10) # 添加轴标签 agraphic.set_ylabel('y轴', fontsize=20) plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-') try: xtricks = list(range(len(acc1) - show_num, len(acc1), 10)) # **1** xlabels = [stime[i] for i in xtricks] # **2** plt.xticks(xtricks, xlabels, rotation=15) except: pass # 图表2 bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子图表标题2') bgraghic.set_xlabel('x轴', fontsize=10) # 添加轴标签 bgraghic.set_ylabel('y轴', fontsize=20) bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^') plt.pause(0.001) # 设置暂停时间,太快图表无法正常显示 count = count + 1if __name__ == '__main__': # 多线程 _thread.start_new_thread(run, ()) draw_figure()感谢各位的阅读,以上就是"Python中如何使用matplotlib绘制mqtt数据实时图像功能"的内容了,经过本文的学习后,相信大家对Python中如何使用matplotlib绘制mqtt数据实时图像功能这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
图表
标题
图像
实时
订阅
函数
时间
标签
功能
大小
客户
客户端
小数
小数点
消息
程序
更新
绘图
内容
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
最新的软件开发技术有哪些
软件开发企业净利率
数据库技术常用的概念模型
ssl服务器可能需要更新
黄牛软件开发
揭阳无线软件开发厂家直销
日本数据库设计图
java软件开发工程面试
连接数据库代码一直报错
服务器损坏资料怎么导出
笔记本远程访问服务器在哪儿
桌面记事本软件开发
惠普服务器维修哪家好
泉州企业财务管理软件开发
奉贤区无线网络技术优势
数据库修改数据类型
聊城专业的联想服务器代理
等保三级必备网络安全设备
数据库教学实例
网络安全教育图片小学
企业内部服务器错误
幼儿园网络安全工作台账
饥荒进入别人服务器卡
远离不良网络安全教育
姑苏区品牌网络技术服务电话
软件开发选择语言
服务器上的ftp
阿里云服务器端口查看
迈迪工程软件开发集市
永辉超市的软件开发公司