python mqtt 处理消息返回http报文,mqtt 发送并接受message的python实现
需要使用两个知识点,一个是多线程,一个是队列queue。需要先开一个线程,该线程是订阅topic,然后等待消息。另外一个线程,就是发布消息。比较坑的是mqtt的python包,只说了回调函数on_message,但是该回调是通过subscribe函数调用的,无法返回message到主程序。经过多次尝试,最后想到用queue,在on_message中put message,然后在主函数中get me
需要使用两个知识点,一个是多线程,一个是队列queue。
需要先开一个线程,该线程是订阅topic,然后等待消息。
另外一个线程,就是发布消息。
比较坑的是mqtt的python包,只说了回调函数on_message,但是该回调是通过subscribe函数调用的,无法返回message到主程序。
经过多次尝试,最后想到用queue,在on_message中put message,然后在主函数中get message,再在返回message。
on_message函数将消息写入queue
def on_message(client, userdata, msg):
mss = str((msg.payload).decode('utf-8'))
print(mss)
mp.put(mss)
主函数get后返回给主程序,并且开了两个线程
def mqtt_send_check(client, topic, msg):
client.loop_start()
thread_sub = threading.Thread(target=mqtt_subscribe, args=(client,topic))
thread_sub.start()
thread_pub = threading.Thread(target=mqtt_publish, args=(client,topic,msg))
thread_pub.start()
time.sleep(4)
client.loop_stop()
return(mp.get())
pytest测试程序
class Test_comm:
TOPIC_REPORT = PRODUCT_KEY + "/" + DEVICE_NAME + "/abc"
def test_comm(self):
MSG = "aaa"
client = mqtt_connect(MQTT_SERVER_ADDR, MQTT_SERVER_PORT,PRODUCT_KEY, DEVICE_NAME,DEVICE_SECRET, TIMESTAMP)
pps = mqtt_send_check(client, self.TOPIC_REPORT, MSG)
print('pps:',pps)
pytest程序通过这种方式,可以得到message消息,实现断言了。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)