我有一个连接到MQTT并订阅主题的Python 3.6代码。每次触发回调函数“on_message”时,它都会实例化一个类,该类有一个方法,该方法执行以下操作:打开db文件,保存接收到的数据,关闭db文件。

上面描述的Python脚本几乎可以正常工作。它每秒接收大约7个MQTT消息,因此对于每个消息,它需要

[Open_DB - Save_Data - Close_DB]

. 有一些消息正在获取PUBACK但未保存,可能是由于一些不必要的操作,因此我要改进:

1. MyDbClass.open_db_file()

2. MyDbClass.save_data()

3. MyDbClass.close_db_file()

你可能猜到的问题是

MyDbClass.save_data()

在“on_message”回调函数中,即使对象已放置在全局变量上。以下是我为便于阅读而整理的建议想法的非工作代码:

# -----------------------------

import paho.mqtt.client as mqtt

import time

import json

import sqlite3

全局变量

db_object = ""

class MyDbClass():

def __init__(self):

pass

def open_db_file(self, dbfile):

self.db_conn = sqlite3.connect(db_file)

return self.db_conn

def save_data(self, json_data):

self.time_stamp = time.strftime('%Y%m%d%H%M%S')

self.data = json.loads(json_data)

self.sql = '''INSERT INTO trans_reqs (received, field_a, field_b, field_c) \

VALUES (?, ?, ?, ?)'''

self.fields_values = ( self.time_stamp, self.data['one'], self.data['two'], self.data['three']] )

self.cur = self.db_conn.cursor()

self.cur.execute(self.sql, self.fields_values)

self.db_conn.commit()

def close_db_file(self):

self.cur.close()

self.db_conn.close()

def on_mqtt_message(client, userdata, msg):

global db_object

m_decode = msg.payload.decode("utf-8","ignore")

db_object.save_data(m_decode)

def main():

global db_object

db_file = "my_filename.sqlite"

db_object = MyDbClass.open_db_file(db_file)

# MQTT -- Set varibles

broker_address= "..."

port = 1883

client_id = "..."

sub_topic = "..."

sub_qos = 1

# MQTT -- Instanciate the MQTT Client class and set callbacks

client = mqtt.Client(client_id)

client.on_connect = on_mqtt_connect

client.on_disconnect = on_mqtt_disconnect

client.on_message = on_mqtt_message

client.on_log = on_mqtt_log

client.clean_session = True

#client.username_pw_set(usr, password=pwd) #set username and password

print('Will connect to broker ', broker_address)

client.connect(broker_address, port=port, keepalive=45 )

client.loop_start()

client.subscribe(sub_topic, sub_qos)

try:

while True:

time.sleep(.1)

except KeyboardInterrupt:

# Disconnects MQTT

client.disconnect()

client.loop_stop()

print("....................................")

print("........ User Interrupted ..........")

print("....................................")

db_object.close_db_file()

client.loop_stop()

client.disconnect()

if __name__ == "__main__":

main()

任何关于如何做到这一点的帮助将不胜感激!

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐