好的,我们来整合这三个文件的核心功能,并使用 vnpy.gateway.ctp 来构建一个集成了行情处理、智能下单映射和交易状态管理的框架。

这个设计将包含以下几个部分:

  1. RedisMarketHandler: 负责处理从 vn.py 事件引擎接收到的行情数据,进行K线合成,并将Tick和Bar数据存入Redis(改编自 ctp_market_api_fun.py)。
  2. SmartOrderPlacer: 负责接收简单的买卖指令,根据vn.py提供的实时持仓信息和行情信息,决定正确的开平仓标志并发送订单请求(改编自 ctp_function_交易映射.py)。
  3. MyTradingApp: 一个vn.py应用模块,负责:
    • 初始化并连接CTP网关。
    • 初始化 RedisMarketHandlerSmartOrderPlacer
    • 注册事件监听器,将行情事件分发给 RedisMarketHandler,并将合约信息等传递给需要的模块。
    • 提供给策略或其他上层逻辑调用的简化交易接口(如 buy, sell),内部调用 SmartOrderPlacer
    • 管理定时任务(例如K线超时闭合)。

注意:

  • 你需要先安装 vnpy 及其 ctp 网关 (pip install vnpy_ctp)。
  • 你需要安装 redis 库 (pip install redis)。
  • 你需要运行一个 Redis 服务器。
  • 你需要一个 connect_ctp.json 文件来配置CTP账户信息。
# -*- coding: utf-8 -*-
"""
集成化vn.py CTP应用框架示例
结合了行情处理、Redis存储、智能下单映射功能
"""

import re
import time
import threading
import random
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple

# --- Redis 连接 ---
try:
    import redis
except ImportError:
    print("请先安装 redis 库: pip install redis")
    exit()

# --- vn.py 核心组件 ---
from vnpy.event import Event, EventEngine,# EVENT_TIMER
from vnpy.trader.engine import MainEngine
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
    TickData, BarData, OrderData, TradeData, PositionData, AccountData, ContractData,
    OrderRequest, CancelRequest, SubscribeRequest, HistoryRequest, LogData
)
from vnpy.trader.constant import Direction, Offset, Exchange, OrderType, Status, Interval
from vnpy.gateway.ctp import CtpGateway # 引入CTP网关

# --- 全局变量和配置 ---
# Redis 客户端实例 (根据你的配置修改)
try:
    redis_client = redis.Redis(host='localhost', port=6379, db=5, decode_responses=True)
    redis_client.ping() # 测试连接
    print("Redis 连接成功")
except Exception as e:
    print(f"Redis 连接失败: {e}. 请确保Redis服务正在运行且配置正确。")
    redis_client = None # 设置为None,后续逻辑会处理

# CTP 账户配置文件名
CTP_CONNECT_FILE = "connect_ctp.json"

# --- 1. 行情处理与Redis存储模块 ---
class RedisMarketHandler:
    """
    处理行情数据,合成K线,并存入Redis
    改编自 ctp_market_api_fun.py
    """
    def __init__(self, redis_cli: redis.Redis, event_engine: EventEngine):
        self.redis_client = redis_cli
        self.event_engine = event_engine
        self.bar_m1: Dict[str, BarData] = {} # vt_symbol -> BarData
        self.last_tick_volume: Dict[str, int] = {} # vt_symbol -> Volume
        self.contracts: Dict[str, ContractData] = {} # vt_symbol -> ContractData
        self.active_symbols = set() # 记录已订阅的合约

        # K线超时闭合定时器 (每15秒检查一次)
        self._timer_count = 0
        self.event_engine.register(EVENT_TIMER, self.process_timer_event)

        print("Redis行情处理器初始化完成")

    def update_contract(self, contract: ContractData):
        """更新合约信息"""
        self.contracts[contract.vt_symbol] = contract
        # print(f"更新合约信息: {contract.vt_symbol}")

    def subscribe(self, vt_symbol: str):
        """记录需要处理行情的合约"""
        self.active_symbols.add(vt_symbol)

    def process_tick_event(self, event: Event):
        """处理 vn.py 推送的 Tick 事件"""
        tick: TickData = event.data
        if not self.redis_client or tick.vt_symbol not in self.active_symbols:
            return

        # print(f"收到 Tick: {tick.vt_symbol}, Price: {tick.last_price}, Time: {tick.datetime}")

        # 1. 计算成交量增量
        last_volume = self.last_tick_volume.get(tick.vt_symbol, 0)
        delta_volume = tick.volume - last_volume
        # 处理CTP开盘或重连时成交量突变/重置的情况
        if delta_volume < 0 or last_volume == 0:
            delta_volume = tick.volume if last_volume == 0 else 0 # 首次tick算全部,后续重置算0或特殊处理
        self.last_tick_volume[tick.vt_symbol] = tick.volume

        # 2. 存储最新 Tick 数据到 Redis
        tick_key = f"{tick.vt_symbol.upper()}_tick"
        # 转换为字典存储 (eval在原始代码中使用,这里用更安全的方式)
        # 注意:vn.py的TickData.datetime已经是校准过的本地时间datetime对象
        tick_dict = {
            '交易日': tick.datetime.strftime("%Y%m%d"), # 或使用tick内字段,vnpy tick可能没有trading_day
            '日期': tick.datetime.strftime("%Y-%m-%d"),
            '合约代码': tick.symbol,
            'vt_symbol': tick.vt_symbol,
            '品种代码': re.sub(r'[^a-zA-Z]', '', tick.symbol),
            '最新价': tick.last_price,
            '数量': tick.volume,
            '持仓量': tick.open_interest,
            '涨停板价': tick.limit_up,
            '跌停板价': tick.limit_down,
            '最后修改时间': tick.datetime.strftime("%H:%M:%S"),
            '最后修改毫秒': tick.datetime.microsecond // 1000,
            '申买价一': tick.bid_price_1,
            '申买量一': tick.bid_volume_1,
            '申卖价一': tick.ask_price_1,
            '申卖量一': tick.ask_volume_1,
            '日期时间': tick.datetime.strftime("%Y-%m-%d %H:%M:%S"),
            'vol': delta_volume # 存储计算出的增量成交量
        }
        try:
            # 使用 HSET 存储字典,比存储字符串更灵活
            self.redis_client.hset(tick_key, mapping=tick_dict)
            # print(f"Redis 更新 Tick HASH: {tick_key}")
        except Exception as e:
            print(f"写入Redis Tick HASH失败 ({tick_key}): {e}")

        # 3. 更新或生成1分钟 K线 (仅当有实际成交量时)
        if delta_volume > 0 and tick.last_price > 0: # 过滤掉集合竞价或无成交量的tick
            self._update_bar(tick, delta_volume)

    def _update_bar(self, tick: TickData, delta_volume: int):
        """根据Tick更新1分钟K线"""
        bar = self.bar_m1.get(tick.vt_symbol)
        contract = self.contracts.get(tick.vt_symbol)
        if not contract:
            # print(f"警告: 无法找到合约信息 {tick.vt_symbol},跳过K线合成")
            return

        # K线时间戳,对齐到分钟开始
        bar_time = tick.datetime.replace(second=0, microsecond=0)

        bar_key_new = f"{tick.vt_symbol.upper()}_1m_new"
        bar_key_close = f"{tick.vt_symbol.upper()}_1m_close"

        # 计算成交额增量
        tick_turnover = delta_volume * tick.last_price * contract.size

        if not bar:
            # 创建新Bar
            bar = BarData(
                gateway_name=tick.gateway_name,
                symbol=tick.symbol,
                exchange=tick.exchange,
                datetime=bar_time,
                interval=Interval.MINUTE,
                volume=delta_volume,
                turnover=tick_turnover,
                open_interest=tick.open_interest,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price
            )
            self.bar_m1[tick.vt_symbol] = bar
            # print(f"创建新Bar: {tick.vt_symbol} @ {bar_time}")
        elif bar.datetime == bar_time:
            # 更新当前Bar
            bar.high_price = max(bar.high_price, tick.last_price)
            bar.low_price = min(bar.low_price, tick.last_price)
            bar.close_price = tick.last_price
            bar.volume += delta_volume
            bar.turnover += tick_turnover
            bar.open_interest = tick.open_interest
            # print(f"更新Bar: {tick.vt_symbol} @ {bar_time}")
        else:
            # 分钟切换,旧Bar结束
            # 1. 保存旧Bar到Redis Close List
            closed_bar_list = [
                bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
                bar.open_price, bar.high_price, bar.low_price, bar.close_price,
                bar.volume, bar.turnover, bar.open_interest
            ]
            try:
                self.redis_client.rpush(bar_key_close, str(closed_bar_list)) # 列表转字符串存储
                # print(f"Redis 关闭 K线 List: {bar_key_close} @ {bar.datetime}")
            except Exception as e:
                print(f"写入Redis K线List失败 ({bar_key_close}): {e}")

            # 2. 创建新Bar
            bar = BarData(
                gateway_name=tick.gateway_name,
                symbol=tick.symbol,
                exchange=tick.exchange,
                datetime=bar_time,
                interval=Interval.MINUTE,
                volume=delta_volume,
                turnover=tick_turnover,
                open_interest=tick.open_interest,
                open_price=tick.last_price,
                high_price=tick.last_price,
                low_price=tick.last_price,
                close_price=tick.last_price
            )
            self.bar_m1[tick.vt_symbol] = bar
            # print(f"分钟切换,创建新Bar: {tick.vt_symbol} @ {bar_time}")

        # 3. 更新Redis中的实时Bar ('_1m_new')
        # 存储为 HASH 更易于读取特定字段
        new_bar_dict = {
            "trade_time": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
            "open": bar.open_price,
            "high": bar.high_price,
            "low": bar.low_price,
            "close": bar.close_price,
            "volume": bar.volume,
            "turnover": bar.turnover,
            "open_interest": bar.open_interest,
        }
        try:
            self.redis_client.hset(bar_key_new, mapping=new_bar_dict)
            # print(f"Redis 更新实时 K线 HASH: {bar_key_new}")
        except Exception as e:
            print(f"写入Redis实时K线 HASH失败 ({bar_key_new}): {e}")

    def process_timer_event(self, event: Event):
        """处理定时器事件,用于超时闭合K线"""
        self._timer_count += 1
        if self._timer_count < 15: # 每 15 * 1秒 = 15秒 检查一次
             return
        self._timer_count = 0 # 重置计数器

        if not self.redis_client:
            return

        now = datetime.now()
        # print(f"定时检查K线超时: {now}")
        symbols_to_clear = []
        for vt_symbol, bar in self.bar_m1.items():
            # 如果bar的最后更新时间(用bar.datetime近似)距离现在超过一定时间(例如90秒)
            # 并且当前时间不在交易时段内(这个判断比较复杂,暂时简化为只看时间差)
            if (now - bar.datetime) > timedelta(seconds=90):
                print(f"检测到超时K线,准备闭合: {vt_symbol} @ {bar.datetime}")
                bar_key_close = f"{vt_symbol.upper()}_1m_close"
                closed_bar_list = [
                    bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
                    bar.open_price, bar.high_price, bar.low_price, bar.close_price,
                    bar.volume, bar.turnover, bar.open_interest
                ]
                try:
                    self.redis_client.rpush(bar_key_close, str(closed_bar_list))
                    print(f"Redis 超时关闭 K线 List: {bar_key_close}")
                    symbols_to_clear.append(vt_symbol)
                except Exception as e:
                     print(f"Redis超时关闭K线失败 ({bar_key_close}): {e}")

        # 清理内存中已闭合的Bar
        for vt_symbol in symbols_to_clear:
            if vt_symbol in self.bar_m1:
                del self.bar_m1[vt_symbol]
                print(f"已从内存清理超时K线: {vt_symbol}")

# --- 2. 智能下单映射模块 ---
class SmartOrderPlacer:
    """
    根据持仓和规则决定开平仓标志,并发送订单
    改编自 ctp_function_交易映射.py
    """
    def __init__(self, main_engine: MainEngine, redis_cli: Optional[redis.Redis]):
        self.main_engine = main_engine
        self.redis_client = redis_cli
        self.contracts: Dict[str, ContractData] = {}
        # 需要区分平今的交易所列表
        self.shfe_ine_exchanges = {Exchange.SHFE, Exchange.INE}
        # 股指期货等需要锁仓的品种前缀 (根据实际需要调整)
        self.lock_codes = {"IF", "IC", "IM", "IH"}
        print("智能下单处理器初始化完成")

    def update_contract(self, contract: ContractData):
        """更新合约信息"""
        self.contracts[contract.vt_symbol] = contract

    def get_latest_price(self, vt_symbol: str) -> Optional[Tuple[float, float]]:
        """获取最新买一卖一价 (优先从vn.py获取,失败则尝试Redis)"""
        tick = self.main_engine.get_tick(vt_symbol)
        if tick and tick.bid_price_1 > 0 and tick.ask_price_1 > 0:
            return tick.bid_price_1, tick.ask_price_1
        elif self.redis_client:
            # 尝试从Redis获取 (假设Redis里存了HASH)
            tick_key = f"{vt_symbol.upper()}_tick"
            try:
                tick_dict = self.redis_client.hgetall(tick_key)
                bid_price = float(tick_dict.get('申买价一', 0))
                ask_price = float(tick_dict.get('申卖价一', 0))
                if bid_price > 0 and ask_price > 0:
                    return bid_price, ask_price
            except Exception as e:
                print(f"从Redis获取价格失败 ({tick_key}): {e}")
                return None
        return None

    def place_smart_order(
        self,
        vt_symbol: str,
        direction: Direction,
        volume: float,
        order_type: OrderType = OrderType.LIMIT, # 默认限价单
        lock: bool = False, # 是否强制锁仓(暂未使用)
        extra_price_ticks: int = 5 # 超价tick数
    ) -> Optional[str]: # 返回vn.py的订单号 vt_orderid
        """
        智能下单主函数

        Args:
            vt_symbol (str): 合约代码.交易所
            direction (Direction): 方向 (买/卖)
            volume (float): 数量
            order_type (OrderType): 订单类型 (LIMIT, MARKET, etc.)
            lock (bool): 是否强制锁仓 (目前影响不大,由品种决定)
            extra_price_ticks (int): 限价单超价的tick数

        Returns:
            Optional[str]: vn.py格式的订单ID (vt_orderid),如果下单失败则返回None
        """
        contract = self.contracts.get(vt_symbol)
        if not contract:
            print(f"错误: 找不到合约信息 {vt_symbol}")
            return None

        position = self.main_engine.get_position(vt_symbol) # vn.py 会自动合并多空头寸到一个对象
        if not position: # 如果没有持仓记录,创建一个空的
             position = PositionData(gateway_name=".", symbol=contract.symbol, exchange=contract.exchange)

        price_tuple = self.get_latest_price(vt_symbol)
        if not price_tuple:
            print(f"错误: 无法获取 {vt_symbol} 的最新买卖价")
            return None
        bid_price, ask_price = price_tuple

        # 1. 确定价格
        order_price = 0.0
        if order_type == OrderType.LIMIT:
            if direction == Direction.LONG: # 买入
                order_price = ask_price + contract.pricetick * extra_price_ticks
            else: # 卖出
                order_price = bid_price - contract.pricetick * extra_price_ticks
            # 价格需要对齐到最小变动价位
            order_price = round(round(order_price / contract.pricetick) * contract.pricetick, 10)
            # 防止价格超出涨跌停
            order_price = max(order_price, contract.lower_limit)
            order_price = min(order_price, contract.upper_limit)
            if order_price <= 0:
                 print(f"错误: 计算出的限价单价格异常 {order_price} for {vt_symbol}")
                 return None

        # 2. 确定开平标志 (核心逻辑)
        offset = Offset.OPEN # 默认开仓

        product_code = re.sub(r'[^a-zA-Z]', '', contract.symbol).upper()
        is_lock_product = product_code in self.lock_codes
        is_shfe_ine = contract.exchange in self.shfe_ine_exchanges

        if direction == Direction.LONG: # 计划买入
            # 如果持有空仓,则需要平仓
            if position.short_pos > 0:
                # 对于上期/能源所,优先平今
                if is_shfe_ine and position.short_td >= volume:
                    offset = Offset.CLOSETODAY
                # 否则,平昨仓 (vn.py的Offset.CLOSE代表平昨优先)
                else:
                    offset = Offset.CLOSE
                # 注意:这里简化了逻辑,没有处理平昨不够再平今的情况
                # vn.py CTP网关通常会自动处理Close请求(先平昨再平今),除非交易所严格要求
                # 如果需要严格控制,需要拆分订单,先下Close平昨,再下CloseToday平今

        else: # 计划卖出
            # 如果持有多仓,则需要平仓
            if position.long_pos > 0:
                 # 股指类产品优先锁仓(即开反向仓)
                 if is_lock_product and position.long_td > 0: # 只有日内持仓才倾向于锁仓
                     offset = Offset.OPEN # 通过开反向仓实现锁仓
                 # 对于上期/能源所,优先平今
                 elif is_shfe_ine and position.long_td >= volume:
                     offset = Offset.CLOSETODAY
                 # 否则,平昨仓
                 else:
                     offset = Offset.CLOSE
                 # 同样,简化了先平昨再平今的拆单逻辑

        # 如果经过判断仍然是开仓,并且目标方向已有持仓,打印提示(可能重复开仓)
        if offset == Offset.OPEN:
             if (direction == Direction.LONG and position.long_pos > 0) or \
                (direction == Direction.SHORT and position.short_pos > 0):
                 print(f"提示: {vt_symbol} {direction.value} {offset.value},但已有同向持仓 {position.long_pos if direction == Direction.LONG else position.short_pos}")

        # 3. 构建订单请求
        req = OrderRequest(
            symbol=contract.symbol,
            exchange=contract.exchange,
            direction=direction,
            offset=offset,
            type=order_type,
            volume=volume,
            price=order_price,
            reference=f"smart_{int(time.time()*1000)}" # 增加唯一性
        )

        # 4. 发送订单
        vt_orderid = self.main_engine.send_order(req, contract.gateway_name)

        if vt_orderid:
            print(f"智能下单成功: {vt_orderid} | {vt_symbol} {direction.value} {volume} @ {order_price if order_price > 0 else '市价'} | 开平: {offset.value}")
            return vt_orderid
        else:
            print(f"智能下单失败: {vt_symbol} {direction.value} {volume}")
            return None

    def rollover_and_unlock(self):
        """执行换月和解锁逻辑 (待实现)"""
        print("换月解锁功能暂未实现")
        # 实现逻辑:
        # 1. 获取所有持仓 main_engine.get_all_positions()
        # 2. 对比持仓合约与主力合约字典 (需要外部提供)
        # 3. 如果持有非主力合约,生成平非主力、开主力的OrderRequest并发送
        # 4. 检查是否存在锁仓情况 (多空持仓同时存在且昨仓>0)
        # 5. 如果需要解锁,生成对锁平仓的OrderRequest并发送 (注意区分平今/平昨)
        pass

# --- 3. 主应用模块 ---
class MyTradingApp:
    """
    vn.py 应用模块,整合行情处理、下单映射和交易流程
    """
    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        self.main_engine = main_engine
        self.event_engine = event_engine
        self.gateway_name = "CTP" # 与 connect_ctp.json 中的 gatewayName 一致

        # 初始化 Redis 处理器和下单器
        self.redis_handler = None
        if redis_client:
            self.redis_handler = RedisMarketHandler(redis_client, self.event_engine)
        else:
            print("警告: Redis未连接,行情存储和部分功能将不可用。")

        self.order_placer = SmartOrderPlacer(self.main_engine, redis_client)

        self.subscribed_symbols = set() # 记录已通过vn.py订阅的合约

        print("MyTradingApp 初始化完成")

    def start(self):
        """启动应用,注册事件监听"""
        self.register_event_handlers()
        print("事件监听器注册完成")
        # 连接网关(如果尚未连接)
        self.connect_gateway()

    def connect_gateway(self):
        """加载并连接CTP网关"""
        try:
            self.main_engine.add_gateway(CtpGateway)
            print(f"添加网关 {self.gateway_name} 成功")
        except Exception as e:
             print(f"添加网关 {self.gateway_name} 失败: {e}") # 可能已存在

        # 使用配置文件连接
        self.main_engine.connect(CTP_CONNECT_FILE, self.gateway_name)
        print(f"尝试连接网关 {self.gateway_name} 使用配置文件 {CTP_CONNECT_FILE}...")
        # 需要等待连接成功的回调或轮询状态

    def register_event_handlers(self):
        """注册需要监听的事件"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)
        self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
        self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
        self.event_engine.register(EVENT_LOG, self.process_log_event)

    # --- 事件处理回调 ---
    def process_tick_event(self, event: Event):
        """处理行情事件,分发给 Redis 处理器"""
        if self.redis_handler:
            self.redis_handler.process_tick_event(event)

    def process_order_event(self, event: Event):
        """处理订单回报事件"""
        order: OrderData = event.data
        # 可以根据需要添加逻辑,例如更新UI、记录日志等
        # print(f"订单更新: {order.vt_orderid} | Status: {order.status.value} | Filled: {order.traded}/{order.volume}")
        pass

    def process_trade_event(self, event: Event):
        """处理成交回报事件"""
        trade: TradeData = event.data
        print(f"*** 成交通知 ***: {trade.vt_orderid} | {trade.direction.value} {trade.offset.value} | {trade.volume} @ {trade.price}")
        # 成交后可以触发其他逻辑,如止盈止损计算等
        pass

    def process_position_event(self, event: Event):
        """处理持仓更新事件"""
        position: PositionData = event.data
        # vn.py网关内部会维护持仓,这里主要是为了日志或触发特定逻辑
        # print(f"持仓更新: {position.vt_symbol} | Long: {position.long_pos} (TD:{position.long_td}, YD:{position.long_yd}) | Short: {position.short_pos} (TD:{position.short_td}, YD:{position.short_yd})")
        pass

    def process_account_event(self, event: Event):
        """处理账户资金事件"""
        account: AccountData = event.data
        # print(f"账户更新: {account.accountid} | Balance: {account.balance:.2f} | Available: {account.available:.2f}")
        pass

    def process_contract_event(self, event: Event):
        """处理合约信息事件,更新内部存储"""
        contract: ContractData = event.data
        if self.redis_handler:
            self.redis_handler.update_contract(contract)
        self.order_placer.update_contract(contract)

    def process_log_event(self, event: Event):
        """处理日志事件"""
        log: LogData = event.data
        # 避免打印过多行情日志,可以过滤
        if "onRtnDepthMarketData" not in log.msg and "查询行情快照频率" not in log.msg :
             print(f"日志: [{log.level.name}] {log.msg}")


    # --- 对外交易接口 ---
    def subscribe(self, symbols: list):
        """订阅行情"""
        gateway = self.main_engine.get_gateway(self.gateway_name)
        if not gateway or not gateway.active:
            print(f"错误: 网关 {self.gateway_name} 未连接或未激活")
            return

        print(f"尝试订阅行情: {symbols}")
        for symbol in symbols:
            # 获取合约信息,重点是交易所
            contract = self.main_engine.get_contract(symbol) # 这里symbol可能是简写如 ag2412
            if not contract:
                print(f"警告: 无法找到合约 {symbol} 的信息,无法订阅")
                continue

            vt_symbol = contract.vt_symbol # 获取完整的 vt_symbol
            req = SubscribeRequest(symbol=contract.symbol, exchange=contract.exchange)
            self.main_engine.subscribe(req, gateway.gateway_name)
            self.subscribed_symbols.add(vt_symbol)
            if self.redis_handler:
                self.redis_handler.subscribe(vt_symbol) # 通知Redis处理器也关注这个合约
            print(f"已发送 {vt_symbol} 的订阅请求")
            time.sleep(0.1) # 防止订阅过快

    def buy(self, vt_symbol: str, volume: float, price: float = 0.0, order_type: OrderType = OrderType.LIMIT, **kwargs) -> Optional[str]:
        """简化买入接口"""
        # 如果price > 0, 优先使用传入价格构建限价单请求
        if price > 0 and order_type == OrderType.LIMIT:
            contract = self.contracts.get(vt_symbol)
            if not contract:
                print(f"错误: 找不到合约信息 {vt_symbol}")
                return None
            req = OrderRequest(
                symbol=contract.symbol,
                exchange=contract.exchange,
                direction=Direction.LONG,
                offset=Offset.OPEN, # 简化接口先默认为开仓,智能下单会判断
                type=OrderType.LIMIT,
                volume=volume,
                price=price,
                reference=f"simple_buy_{int(time.time()*1000)}"
            )
             # 注意:这里构建了简单的OrderRequest,但仍应通过SmartOrderPlacer处理开平
             # return self.main_engine.send_order(req, contract.gateway_name)
             # **修正:应调用智能下单器**
            print("提示: buy接口调用智能下单器进行开平判断...")
            return self.order_placer.place_smart_order(vt_symbol, Direction.LONG, volume, order_type=order_type)
        else: # 否则,使用智能下单(传入价格0或市价单类型)
             return self.order_placer.place_smart_order(vt_symbol, Direction.LONG, volume, order_type=order_type)


    def sell(self, vt_symbol: str, volume: float, price: float = 0.0, order_type: OrderType = OrderType.LIMIT, **kwargs) -> Optional[str]:
        """简化卖出接口(通常用于平多仓或开空仓)"""
        # 同样,优先使用传入价格构建限价单
        if price > 0 and order_type == OrderType.LIMIT:
            contract = self.contracts.get(vt_symbol)
            if not contract:
                print(f"错误: 找不到合约信息 {vt_symbol}")
                return None
            # **修正:应调用智能下单器**
            print("提示: sell接口调用智能下单器进行开平判断...")
            return self.order_placer.place_smart_order(vt_symbol, Direction.SHORT, volume, order_type=order_type)
        else:
             return self.order_placer.place_smart_order(vt_symbol, Direction.SHORT, volume, order_type=order_type)

    def cover(self, vt_symbol: str, volume: float, price: float = 0.0, order_type: OrderType = OrderType.LIMIT, **kwargs) -> Optional[str]:
        """简化买平接口(通常用于平空仓)"""
        # **修正:应调用智能下单器**
        print("提示: cover接口调用智能下单器进行开平判断...")
        return self.order_placer.place_smart_order(vt_symbol, Direction.LONG, volume, order_type=order_type)

    def stop(self):
        """停止应用"""
        self.main_engine.close()
        print("应用已停止")


# --- 主程序入口 ---
def main():
    """主函数"""
    # 创建事件引擎和主引擎
    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)

    # 创建并启动交易应用
    trading_app = MyTradingApp(main_engine, event_engine)
    trading_app.start() # 会连接网关并注册事件

    print("等待CTP网关连接成功...")
    # 需要等待网关连接成功,这里简单等待几秒
    # 正式应用需要检查网关状态或等待特定事件
    time.sleep(10)

    # 获取 CTP 网关对象
    gateway = main_engine.get_gateway(trading_app.gateway_name)
    if not gateway or not gateway.active:
         print(f"网关 {trading_app.gateway_name} 连接失败,请检查配置和网络。程序退出。")
         return

    print(f"网关 {trading_app.gateway_name} 连接成功。")

    # 订阅行情示例 (从配置文件或列表读取)
    # 注意:symbol应该是合约代码,如 ag2412, IF2412
    symbols_to_subscribe = ["ag2412", "au2412", "IF2412"] # 示例合约
    trading_app.subscribe(symbols_to_subscribe)

    print("行情订阅已发送,等待行情数据...")
    time.sleep(5) # 等待合约信息和初始行情

    # --- 交易指令示例 ---
    vt_symbol_ag = "ag2412.SHFE" # 使用 vn.py 的 vt_symbol 格式
    vt_symbol_if = "IF2412.CFFEX"

    # 示例1: 使用智能下单买入开仓 ag 1手 (自动判断开平)
    print("\n--- 示例1: 智能买入开仓 ag ---")
    trading_app.buy(vt_symbol_ag, 1)
    time.sleep(2) # 等待订单回报

    # 示例2: 再次智能买入 ag 1手 (如果上一单成交,这次还是开仓)
    print("\n--- 示例2: 再次智能买入 ag ---")
    trading_app.buy(vt_symbol_ag, 1)
    time.sleep(2)

    # 示例3: 智能卖出平仓 ag 1手 (假设之前有买入持仓)
    print("\n--- 示例3: 智能卖出平仓 ag ---")
    trading_app.sell(vt_symbol_ag, 1)
    time.sleep(2)

    # 示例4: 智能卖出开仓 IF 1手
    print("\n--- 示例4: 智能卖出开仓 IF ---")
    trading_app.sell(vt_symbol_if, 1)
    time.sleep(2)

    # 示例5: 智能买入平仓 IF 1手 (股指,假设之前有卖出持仓)
    print("\n--- 示例5: 智能买入平仓 IF ---")
    trading_app.buy(vt_symbol_if, 1)
    time.sleep(2)


    # 阻塞主线程,使事件引擎可以继续运行
    print("\n按 CTRL+C 退出程序")
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            print("收到退出信号...")
            break

    # 关闭
    trading_app.stop()

if __name__ == "__main__":
    main()

代码解释:

  1. Redis 连接与配置:

    • 代码首先尝试连接 Redis 服务器。如果失败,会打印警告,RedisMarketHandler 的相关功能将不可用。
    • CTP_CONNECT_FILE 定义了 CTP 账户的 JSON 配置文件名。
  2. RedisMarketHandler 类:

    • 初始化 (__init__): 接收 Redis 客户端和 vn.py 事件引擎实例。初始化用于存储K线、最新成交量和合约信息的字典。注册定时器事件处理函数。
    • update_contract: 由 MyTradingApp 调用,用于接收并存储 vn.py 推送的合约详细信息(乘数、最小变动价位等)。
    • subscribe: 记录需要处理行情的 vt_symbol
    • process_tick_event:
      • 这是核心的行情处理函数,由事件引擎在收到 EVENT_TICK 时调用。
      • 计算增量成交量 (delta_volume),并处理开盘/重连时成交量可能重置的问题。
      • 将处理后的 Tick 信息(包含增量成交量 vol)以 HASH 格式存入 Redis(键格式:VT_SYMBOL_tick)。使用 HASH 比存储字符串更方便读取特定字段。
      • 如果 delta_volume > 0 且价格有效,调用 _update_bar 处理K线。
    • _update_bar:
      • 根据 Tick 数据和合约信息(乘数)计算成交额。
      • 判断当前 Tick 属于哪个1分钟 K线。
      • 创建新 Bar: 如果是该合约的第一个 Tick 或新的一分钟开始。
      • 更新当前 Bar: 如果 Tick 属于正在进行的 K线。
      • 闭合旧 Bar: 当分钟切换时,将完成的 K线数据(列表转字符串)存入 Redis 的 List 中(键格式:VT_SYMBOL_1m_close),rpush 实现追加。
      • 更新实时 Bar: 将当前正在形成的 K线数据(无论新建还是更新)以 HASH 格式写入 Redis(键格式:VT_SYMBOL_1m_new),hset 实现覆盖更新。
    • process_timer_event: 定期(每15秒)检查 bar_m1 中的 K线,如果某个 K线 的时间戳距离当前时间过长(例如90秒),则认为该 K线 超时未更新(可能收盘或休市),将其数据存入 Redis 的 _1m_close 列表,并从内存 bar_m1 中移除。
  3. SmartOrderPlacer 类:

    • 初始化 (__init__): 接收 MainEngine(用于发单、查行情、查持仓、查合约)和 Redis 客户端实例。
    • update_contract: 存储合约信息,特别是交易所信息(用于判断平今规则)、priceticksize
    • get_latest_price: 优先从 main_engine.get_tick 获取最新 Tick 的买一卖一价。如果失败,则尝试从 Redis 读取之前 RedisMarketHandler 存入的 Tick HASH 数据。
    • place_smart_order:
      • 获取信息: 获取目标合约 ContractData、当前 PositionData、最新买卖价。
      • 计算价格: 对限价单计算超价,并确保价格在涨跌停内且符合最小变动价位。
      • 判断开平 (Offset): 这是核心逻辑:
        • 默认 Offset.OPEN
        • 检查反向持仓是否存在 (position.short_posposition.long_pos)。
        • 如果存在反向持仓:
          • 检查是否为股指期货等需要锁仓的品种 (is_lock_product) 且有日内反向持仓,如果是,则保持 Offset.OPEN(开反向仓锁仓)。
          • 检查是否为上期所/能源中心合约 (is_shfe_ine) 且今日反向持仓 (position.short_tdposition.long_td) 足够平掉本次下单量,如果是,则设置 Offset.CLOSETODAY
          • 其他情况(包括平昨仓或交易所无特殊要求),设置 Offset.CLOSE(vn.py 的 CTP 网关通常会智能处理 Close:先平昨再平今)。
        • 注意: 此处简化了需要严格区分平昨、平今并拆分订单的复杂情况。
      • 构建 OrderRequest: 使用 vn.py 的 OrderRequest 对象,填入计算好的价格、判断好的 Offset 等信息。
      • 发送订单: 调用 main_engine.send_order 发送订单请求。
      • 返回 vn.py 生成的 vt_orderidNone
    • rollover_and_unlock: 提供了接口,但具体逻辑需要根据实际需求实现。
  4. MyTradingApp 类:

    • 初始化 (__init__): 创建 MainEngineEventEngine 的引用,初始化 RedisMarketHandler (如果 Redis 可用) 和 SmartOrderPlacer
    • start: 注册各类事件的处理函数,并调用 connect_gateway
    • connect_gateway: 添加 CtpGatewayMainEngine 并使用配置文件发起连接。
    • register_event_handlers: 将 vn.py 的各种事件(Tick, Order, Trade, Position, Account, Contract, Log)绑定到相应的 process_..._event 方法。
    • process_..._event 方法:
      • process_tick_event: 将 Tick 事件转发给 redis_handler 处理。
      • process_contract_event: 将合约信息分别更新到 redis_handlerorder_placer
      • 其他事件处理函数目前主要用于打印日志或作为未来扩展的入口。vn.py 网关本身会处理大部分状态更新。
    • subscribe: 接收合约代码列表(如 ag2412),查找对应的 ContractData,构建 SubscribeRequest,并通过 main_engine.subscribe 发送订阅请求。同时通知 redis_handler 开始处理该合约的行情。
    • buy/sell/cover: 提供给上层策略调用的简化接口。它们内部都调用 order_placer.place_smart_order 来执行下单,从而利用其智能判断开平的功能。
  5. main 函数:

    • 创建 MainEngineEventEngine
    • 创建 MyTradingApp 实例并启动。
    • 等待网关连接成功。
    • 调用 trading_app.subscribe 订阅所需合约的行情。
    • 包含了一些调用 trading_app.buy/sell 的示例,演示如何触发智能下单。
    • 最后进入一个循环阻塞主线程,允许事件引擎处理事件,直到用户按 CTRL+C 退出。
    • 退出前调用 trading_app.stop() 关闭主引擎。

这个框架将原始代码的功能模块化,并利用了 vn.py 的事件驱动机制和内置的 CTP 网关功能,使得结构更清晰,更容易扩展。行情处理和下单决策的核心逻辑得以保留和适配。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐