python mqtt客户端转存数据库_Python 3 MQTT客户端将接收到的负载存储在Sqlite-Open DB中一次,存储多次,最后关闭DB?...
我有一个连接到MQTT并订阅主题的Python 3.6代码。每次触发回调函数“on_message”时,它都会实例化一个类,该类有一个方法,该方法执行以下操作:打开db文件,保存接收到的数据,关闭db文件。上面描述的Python脚本几乎可以正常工作。它每秒接收大约7个MQTT消息,因此对于每个消息,它需要[Open_DB - Save_Data - Close_DB].有一些消息正在获取PUB..
我有一个连接到MQTT并订阅主题的Python 3.6代码。每次触发回调函数“on_message”时,它都会实例化一个类,该类有一个方法,该方法执行以下操作:打开db文件,保存接收到的数据,关闭db文件。
上面描述的Python脚本几乎可以正常工作。它每秒接收大约7个MQTT消息,因此对于每个消息,它需要
[Open_DB - Save_Data - Close_DB]
. 有一些消息正在获取PUBACK但未保存,可能是由于一些不必要的操作,因此我要改进:
1. MyDbClass.open_db_file()
2. MyDbClass.save_data()
3. MyDbClass.close_db_file()
你可能猜到的问题是
MyDbClass.save_data()
在“on_message”回调函数中,即使对象已放置在全局变量上。以下是我为便于阅读而整理的建议想法的非工作代码:
# -----------------------------
import paho.mqtt.client as mqtt
import time
import json
import sqlite3
全局变量
db_object = ""
class MyDbClass():
def __init__(self):
pass
def open_db_file(self, dbfile):
self.db_conn = sqlite3.connect(db_file)
return self.db_conn
def save_data(self, json_data):
self.time_stamp = time.strftime('%Y%m%d%H%M%S')
self.data = json.loads(json_data)
self.sql = '''INSERT INTO trans_reqs (received, field_a, field_b, field_c) \
VALUES (?, ?, ?, ?)'''
self.fields_values = ( self.time_stamp, self.data['one'], self.data['two'], self.data['three']] )
self.cur = self.db_conn.cursor()
self.cur.execute(self.sql, self.fields_values)
self.db_conn.commit()
def close_db_file(self):
self.cur.close()
self.db_conn.close()
def on_mqtt_message(client, userdata, msg):
global db_object
m_decode = msg.payload.decode("utf-8","ignore")
db_object.save_data(m_decode)
def main():
global db_object
db_file = "my_filename.sqlite"
db_object = MyDbClass.open_db_file(db_file)
# MQTT -- Set varibles
broker_address= "..."
port = 1883
client_id = "..."
sub_topic = "..."
sub_qos = 1
# MQTT -- Instanciate the MQTT Client class and set callbacks
client = mqtt.Client(client_id)
client.on_connect = on_mqtt_connect
client.on_disconnect = on_mqtt_disconnect
client.on_message = on_mqtt_message
client.on_log = on_mqtt_log
client.clean_session = True
#client.username_pw_set(usr, password=pwd) #set username and password
print('Will connect to broker ', broker_address)
client.connect(broker_address, port=port, keepalive=45 )
client.loop_start()
client.subscribe(sub_topic, sub_qos)
try:
while True:
time.sleep(.1)
except KeyboardInterrupt:
# Disconnects MQTT
client.disconnect()
client.loop_stop()
print("....................................")
print("........ User Interrupted ..........")
print("....................................")
db_object.close_db_file()
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()
任何关于如何做到这一点的帮助将不胜感激!
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)