OpenOPC.py源码中,read方法

   def read(self, tags=None, group=None, size=None, pause=0, source='hybrid', update=-1, timeout=5000, sync=False, include_error=False, rebuild=False):
      """Return list of (value, quality, time) tuples for the specified tag(s)"""

sync=True时获取数据调用的方法:

                   try:
                      values, errors, qualities, timestamps = opc_group.SyncRead(data_source, len(server_handles)-1, server_handles)
                   except pythoncom.com_error as err:
                      error_msg = 'SyncRead: %s' % self._get_error_str(err)
                      raise OPCError(error_msg)

sync=False时执行了下面的代码,不是直接读,而是监听数据:

            if new_group:
               opc_group.IsSubscribed = 1
               opc_group.IsActive = 1
               if not sync:
                  if self.trace: self.trace('WithEvents(%s)' % opc_group.Name)
                  global current_client
                  current_client = self
                  self._group_hooks[opc_group.Name] = win32com.client.WithEvents(opc_group, GroupEvents)
class GroupEvents:
   def __init__(self):
      self.client = current_client
        
   def OnDataChange(self, TransactionID, NumItems, ClientHandles, ItemValues, Qualities, TimeStamps):
      converted_timestamps = [str(ts) for ts in TimeStamps]
      self.client.callback_queue.put((TransactionID, ClientHandles, ItemValues, Qualities, converted_timestamps))

注意,这里的

converted_timestamps = [str(ts) for ts in TimeStamps]

是我加到源码里去的,源码中本身TimeStamps的格式是

pywintypes.datetime(2025, 8, 28, 1, 28, 8, 499000, tzinfo=TimeZoneInfo('GMT Standard Time', True))

callback_queue 中使用 pywintypes.datetime 会报错:

_pickle.PicklingError: Can't pickle <class 'pywintypes.datetime'>: attribute lookup datetime on pywintypes failed

错误信息表明,在尝试使用 Python 的 multiprocessing模块时,遇到了 _pickle.PicklingError,具体是无法序列化 pywintypes.datetime类型的对象。这个问题通常发生在多进程编程中,当主进程尝试将包含特定类型对象(如 pywintypes.datetime)的数据传递给子进程时,由于这些对象无法被 pickle序列化,导致进程间通信失败。

问题原因分析

  1. pywintypes.datetime不可序列化​​:

    • pywintypes.datetimepywin32库中的一个类型,用于处理 Windows 系统特定的日期时间格式。它继承自 Python 的 datetime,但默认情况下无法被 pickle序列化,因为其底层实现依赖于 Windows API,无法跨进程传输。

    • 在多进程编程中,multiprocessing模块依赖 pickle序列化来传递数据。如果数据中包含不可序列化的对象(如 pywintypes.datetime),就会抛出 PicklingError

  2. ​多进程通信的限制​​:

    • multiprocessing的队列(Queue)或进程池(Pool)在传递数据时,要求所有对象必须可序列化。pywintypes.datetime不满足这一条件。

解决方案

方法一:转换为可序列化的 datetime对象

pywintypes.datetime转换为标准的 Python datetime对象,因为后者可以被 pickle序列化。例如:

from datetime import datetime
import pywintypes

# 假设 pyw_dt 是 pywintypes.datetime 对象
pyw_dt = pywintypes.Time(2025, 8, 28)  # 示例对象
std_dt = datetime(
    year=pyw_dt.year,
    month=pyw_dt.month,
    day=pyw_dt.day,
    hour=pyw_dt.hour,
    minute=pyw_dt.minute,
    second=pyw_dt.second
)

# 现在 std_dt 可以被 pickle 序列化,适合传递给子进程

​适用场景​​:如果 pywintypes.datetime仅用于存储时间信息,而不依赖其 Windows 特定功能。

方法二:使用 dill替代 pickle

dill是一个扩展的序列化库,支持更多类型的对象(包括某些不可 pickle的类型)。安装 dill后,可以通过以下方式传递数据:

import dill
import multiprocessing

def worker(dt):
    print(dt)

if __name__ == '__main__':
    pyw_dt = pywintypes.Time(2025, 8, 28)  # 示例对象
    serialized = dill.dumps(pyw_dt)  # 使用 dill 序列化
    p = multiprocessing.Process(target=worker, args=(dill.loads(serialized),))
    p.start()
    p.join()

​注意​​:dill可能无法解决所有类型的序列化问题,需测试确认。

方法三:避免直接传递 pywintypes.datetime

如果可能,重构代码逻辑,避免在多进程间传递 pywintypes.datetime。例如:

  • ​传递时间戳或字符串​​:将日期时间转换为 Unix 时间戳或 ISO 格式字符串(如 "2025-08-28T12:00:00"),在子进程中重新解析。

  • ​共享内存或数据库​​:使用 multiprocessing.Value或外部存储(如 SQLite)共享数据。

方法四:自定义序列化方法

pywintypes.datetime实现 __reduce____getstate__/__setstate__方法,定义其序列化行为。例如:

class SerializablePywintypesDateTime:
    def __init__(self, dt):
        self.dt = dt

    def __getstate__(self):
        return self.dt.year, self.dt.month, self.dt.day, self.dt.hour, self.dt.minute, self.dt.second

    def __setstate__(self, state):
        year, month, day, hour, minute, second = state
        self.dt = pywintypes.Time(year, month, day, hour, minute, second)

​适用场景​​:需要保留 pywintypes.datetime的功能时。

总结

  1. ​优先转换类型​​:将 pywintypes.datetime转换为标准 datetime或字符串(最简单可靠)。

  2. ​尝试 dill​:若必须保留对象类型,使用 dill可能解决序列化问题。

  3. ​重构设计​​:避免在多进程间传递复杂对象,改用共享内存或外部存储。

  4. ​自定义序列化​​:为特殊对象实现序列化方法(需额外代码)。

如果问题仍未解决,请检查是否还有其他不可序列化的对象(如文件句柄、线程等)。

                  while tx_id != self._tx_id:
                     now = time.time() * 1000
                     if now - start > timeout:
                        raise TimeoutError('Callback: Timeout waiting for data')

                     if self.callback_queue.empty():
                        pythoncom.PumpWaitingMessages()
                     else:
                        tx_id, handles, values, qualities, timestamps = self.callback_queue.get()

异步读取的工作原理

sync=False时,read方法并不是去直接“读”数据,而是​​设置一个订阅(Subscription)并尝试获取自上次查询以来缓存的数据变更​​。

  1. ​设置订阅​​:代码通过 WithEvents为 OPC 组挂接了事件钩子 (GroupEvents)。当组内任何标签的值、质量或时间戳发生变化时,OPC 服务器会主动通知客户端,触发 OnDataChange事件,该事件会将数据放入 callback_queue中。

  2. ​获取数据​​:read方法随后检查 callback_queue。​​如果队列里有数据​​,说明在调用 read之前已经有数据变化发生并被缓存了,它就会取出并返回这些数据。​​如果队列是空的​​,它只会调用 pythoncom.PumpWaitingMessages()处理一下 Windows 消息队列(看看有没有“正在路上”的回调消息),然后就返回了,很可能返回的是空列表或旧数据。

常见问题: callback_queue一直是空的

最常见的原因有以下几个:

  1. ​数据尚未发生变化​​:这是最可能的原因。异步读取 (sync=False) 依赖于数据的​​变化​​。如果从你启动订阅到你调用 read的这段时间内,你请求的标签的值没有任何改变,OPC 服务器就不会发送回调通知,callback_queue自然就是空的。

    • ​同步读取 (sync=True)​​:直接向服务器询问当前值,无论它是否变化。

    • ​异步读取 (sync=False)​​:询问“自上次联系后,有什么新变化吗?”。如果没变化,就没什么可返回的。

  2. ​订阅参数问题​​:update参数设置了服务器向客户端发送数据更新的频率(毫秒)。如果 update值设得非常大(比如默认的 -1,表示使用组的默认更新速率),而数据变化频率低于这个速率,你可能在调用 read时还不到下一次更新的时间。

  3. ​连接或组配置问题​​:虽然你同步读取能成功,但异步订阅可能需要额外的正确设置才能工作。

总结

​根本原因​​:异步读取 (sync=False) 依赖于数据变化产生的事件通知。没有变化,就没有数据进入 callback_queue

同步读取 (sync=True)​

import OpenOPC
import time

opc = OpenOPC.client()
opc.connect('SUPCON.SCRTCore.1', '172.133.1.8')
tags = opc.list('FT_*')

try:   
    count_ = 1
    while count_ < 10:
        data = opc.read(tags, sync=True)
        print(f"data: {data[0]}, {len(data)}, tags: {len(tags)}")
        count_ += 1
        time.sleep(3)
finally:
    opc.close()

异步读取 (sync=False)

import OpenOPC
from libs import opcTags

def data_change_handler():
    opc = OpenOPC.client()
    opc.connect('SUPCON.SCRTCore.1', '172.133.1.8')
    
    try:   
        count_ = 1
        while count_ < 10:
            data = opc.read(opcTags.pv_tags, group='MonitorGroup', sync=False, update=3000, timeout=5000) # False True
            print(f"data: {data[0]}, {len(data)}, tags: {len(opcTags.pv_tags)}")
            count_ += 1
    finally:
        opc.remove('MonitorGroup')
        opc.close()

data_change_handler()

资料:

Python3 - opc-DA数据采集-OpenOPC使用说明 - 代码先锋网

Python2.7 opc-DA 数据采集-OpenOPC使用说明-CSDN博客

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐