python 获取 opc da 数据的两种方式
为实现__reduce__或方法,定义其序列化行为。适用场景:需要保留的功能时。优先转换类型:将转换为标准datetime或字符串(最简单可靠)。尝试dill:若必须保留对象类型,使用dill可能解决序列化问题。重构设计:避免在多进程间传递复杂对象,改用共享内存或外部存储。自定义序列化:为特殊对象实现序列化方法(需额外代码)。如果问题仍未解决,请检查是否还有
OpenOPC.py源码中,read方法
def read(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
"""Return list of (value, quality, time) tuples for the specified tag(s)"""
sync=True时获取数据调用的方法:
try:
values, errors, qualities, timestamps = opc_group.SyncRead(data_source, len(server_handles)-1, server_handles)
except pythoncom.com_error as err:
error_msg = 'SyncRead: %s' % self._get_error_str(err)
raise OPCError(error_msg)
sync=False时执行了下面的代码,不是直接读,而是监听数据:
if new_group:
opc_group.IsSubscribed = 1
opc_group.IsActive = 1
if not sync:
if self.trace: self.trace('WithEvents(%s)' % opc_group.Name)
global current_client
current_client = self
self._group_hooks[opc_group.Name] = win32com.client.WithEvents(opc_group, GroupEvents)
class GroupEvents:
def __init__(self):
self.client = current_client
def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps):
converted_timestamps = [str(ts) for ts in TimeStamps]
self.client.callback_queue.put((TransactionID, ClientHandles, ItemValues, Qualities, converted_timestamps))
注意,这里的
converted_timestamps = [str(ts) for ts in TimeStamps]
是我加到源码里去的,源码中本身TimeStamps的格式是
pywintypes.datetime(2025, 8, 28, 1, 28, 8, 499000, tzinfo=TimeZoneInfo('GMT Standard Time', True))
callback_queue 中使用 pywintypes.datetime 会报错:
_pickle.PicklingError: Can't pickle <class 'pywintypes.datetime'>: attribute lookup datetime on pywintypes failed
错误信息表明,在尝试使用 Python 的
multiprocessing模块时,遇到了_pickle.PicklingError,具体是无法序列化pywintypes.datetime类型的对象。这个问题通常发生在多进程编程中,当主进程尝试将包含特定类型对象(如pywintypes.datetime)的数据传递给子进程时,由于这些对象无法被pickle序列化,导致进程间通信失败。问题原因分析
pywintypes.datetime不可序列化:
pywintypes.datetime是pywin32库中的一个类型,用于处理 Windows 系统特定的日期时间格式。它继承自 Python 的datetime,但默认情况下无法被pickle序列化,因为其底层实现依赖于 Windows API,无法跨进程传输。在多进程编程中,
multiprocessing模块依赖pickle序列化来传递数据。如果数据中包含不可序列化的对象(如pywintypes.datetime),就会抛出PicklingError。多进程通信的限制:
multiprocessing的队列(Queue)或进程池(Pool)在传递数据时,要求所有对象必须可序列化。pywintypes.datetime不满足这一条件。解决方案
方法一:转换为可序列化的
datetime对象将
pywintypes.datetime转换为标准的 Pythondatetime对象,因为后者可以被pickle序列化。例如:from datetime import datetime import pywintypes # 假设 pyw_dt 是 pywintypes.datetime 对象 pyw_dt = pywintypes.Time(2025, 8, 28) # 示例对象 std_dt = datetime( year=pyw_dt.year, month=pyw_dt.month, day=pyw_dt.day, hour=pyw_dt.hour, minute=pyw_dt.minute, second=pyw_dt.second ) # 现在 std_dt 可以被 pickle 序列化,适合传递给子进程适用场景:如果
pywintypes.datetime仅用于存储时间信息,而不依赖其 Windows 特定功能。方法二:使用
dill替代pickle
dill是一个扩展的序列化库,支持更多类型的对象(包括某些不可pickle的类型)。安装dill后,可以通过以下方式传递数据:import dill import multiprocessing def worker(dt): print(dt) if __name__ == '__main__': pyw_dt = pywintypes.Time(2025, 8, 28) # 示例对象 serialized = dill.dumps(pyw_dt) # 使用 dill 序列化 p = multiprocessing.Process(target=worker, args=(dill.loads(serialized),)) p.start() p.join()注意:
dill可能无法解决所有类型的序列化问题,需测试确认。方法三:避免直接传递
pywintypes.datetime如果可能,重构代码逻辑,避免在多进程间传递
pywintypes.datetime。例如:
传递时间戳或字符串:将日期时间转换为 Unix 时间戳或 ISO 格式字符串(如
"2025-08-28T12:00:00"),在子进程中重新解析。共享内存或数据库:使用
multiprocessing.Value或外部存储(如 SQLite)共享数据。方法四:自定义序列化方法
为
pywintypes.datetime实现__reduce__或__getstate__/__setstate__方法,定义其序列化行为。例如:class SerializablePywintypesDateTime: def __init__(self, dt): self.dt = dt def __getstate__(self): return self.dt.year, self.dt.month, self.dt.day, self.dt.hour, self.dt.minute, self.dt.second def __setstate__(self, state): year, month, day, hour, minute, second = state self.dt = pywintypes.Time(year, month, day, hour, minute, second)适用场景:需要保留
pywintypes.datetime的功能时。总结
优先转换类型:将
pywintypes.datetime转换为标准datetime或字符串(最简单可靠)。尝试
dill:若必须保留对象类型,使用dill可能解决序列化问题。重构设计:避免在多进程间传递复杂对象,改用共享内存或外部存储。
自定义序列化:为特殊对象实现序列化方法(需额外代码)。
如果问题仍未解决,请检查是否还有其他不可序列化的对象(如文件句柄、线程等)。
while tx_id != self._tx_id:
now = time.time() * 1000
if now - start > timeout:
raise TimeoutError('Callback: Timeout waiting for data')
if self.callback_queue.empty():
pythoncom.PumpWaitingMessages()
else:
tx_id, handles, values, qualities, timestamps = self.callback_queue.get()
异步读取的工作原理
当
sync=False时,read方法并不是去直接“读”数据,而是设置一个订阅(Subscription)并尝试获取自上次查询以来缓存的数据变更。
设置订阅:代码通过
WithEvents为 OPC 组挂接了事件钩子 (GroupEvents)。当组内任何标签的值、质量或时间戳发生变化时,OPC 服务器会主动通知客户端,触发OnDataChange事件,该事件会将数据放入callback_queue中。获取数据:
read方法随后检查callback_queue。如果队列里有数据,说明在调用read之前已经有数据变化发生并被缓存了,它就会取出并返回这些数据。如果队列是空的,它只会调用pythoncom.PumpWaitingMessages()处理一下 Windows 消息队列(看看有没有“正在路上”的回调消息),然后就返回了,很可能返回的是空列表或旧数据。常见问题:
callback_queue一直是空的最常见的原因有以下几个:
数据尚未发生变化:这是最可能的原因。异步读取 (
sync=False) 依赖于数据的变化。如果从你启动订阅到你调用read的这段时间内,你请求的标签的值没有任何改变,OPC 服务器就不会发送回调通知,callback_queue自然就是空的。
同步读取 (
sync=True):直接向服务器询问当前值,无论它是否变化。异步读取 (
sync=False):询问“自上次联系后,有什么新变化吗?”。如果没变化,就没什么可返回的。订阅参数问题:
update参数设置了服务器向客户端发送数据更新的频率(毫秒)。如果update值设得非常大(比如默认的-1,表示使用组的默认更新速率),而数据变化频率低于这个速率,你可能在调用read时还不到下一次更新的时间。连接或组配置问题:虽然你同步读取能成功,但异步订阅可能需要额外的正确设置才能工作。
总结
根本原因:异步读取 (
sync=False) 依赖于数据变化产生的事件通知。没有变化,就没有数据进入callback_queue。
同步读取 (sync=True)
import OpenOPC
import time
opc = OpenOPC.client()
opc.connect('SUPCON.SCRTCore.1', '172.133.1.8')
tags = opc.list('FT_*')
try:
count_ = 1
while count_ < 10:
data = opc.read(tags, sync=True)
print(f"data: {data[0]}, {len(data)}, tags: {len(tags)}")
count_ += 1
time.sleep(3)
finally:
opc.close()
异步读取 (sync=False)
import OpenOPC
from libs import opcTags
def data_change_handler():
opc = OpenOPC.client()
opc.connect('SUPCON.SCRTCore.1', '172.133.1.8')
try:
count_ = 1
while count_ < 10:
data = opc.read(opcTags.pv_tags, group='MonitorGroup', sync=False, update=3000, timeout=5000) # False True
print(f"data: {data[0]}, {len(data)}, tags: {len(opcTags.pv_tags)}")
count_ += 1
finally:
opc.remove('MonitorGroup')
opc.close()
data_change_handler()
资料:
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)