基于vnpy-ctp接口自建量化平台的接口api封装,实盘用代码经过大模型重构分享 仅供参考学习 不可运行
好的,我们来整合这三个文件的核心功能,并使用来构建一个集成了行情处理、智能下单映射和交易状态管理的框架。这个设计将包含以下几个部分:注意:代码解释:Redis 连接与配置: 类: 类: 类: 函数:这个框架将原始代码的功能模块化,并利用了 vn.py 的事件驱动机制和内置的 CTP 网关功能,使得结构更清晰,更容易扩展。行情处理和下单决策的核心逻辑得以保留和适配。
·
好的,我们来整合这三个文件的核心功能,并使用 vnpy.gateway.ctp 来构建一个集成了行情处理、智能下单映射和交易状态管理的框架。
这个设计将包含以下几个部分:
RedisMarketHandler: 负责处理从 vn.py 事件引擎接收到的行情数据,进行K线合成,并将Tick和Bar数据存入Redis(改编自ctp_market_api_fun.py)。SmartOrderPlacer: 负责接收简单的买卖指令,根据vn.py提供的实时持仓信息和行情信息,决定正确的开平仓标志并发送订单请求(改编自ctp_function_交易映射.py)。MyTradingApp: 一个vn.py应用模块,负责:- 初始化并连接CTP网关。
- 初始化
RedisMarketHandler和SmartOrderPlacer。 - 注册事件监听器,将行情事件分发给
RedisMarketHandler,并将合约信息等传递给需要的模块。 - 提供给策略或其他上层逻辑调用的简化交易接口(如
buy,sell),内部调用SmartOrderPlacer。 - 管理定时任务(例如K线超时闭合)。
注意:
- 你需要先安装
vnpy及其ctp网关 (pip install vnpy_ctp)。 - 你需要安装
redis库 (pip install redis)。 - 你需要运行一个 Redis 服务器。
- 你需要一个
connect_ctp.json文件来配置CTP账户信息。
# -*- coding: utf-8 -*-
"""
集成化vn.py CTP应用框架示例
结合了行情处理、Redis存储、智能下单映射功能
"""
import re
import time
import threading
import random
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple
# --- Redis 连接 ---
try:
import redis
except ImportError:
print("请先安装 redis 库: pip install redis")
exit()
# --- vn.py 核心组件 ---
from vnpy.event import Event, EventEngine,ਔ # EVENT_TIMER
from vnpy.trader.engine import MainEngine
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData, BarData, OrderData, TradeData, PositionData, AccountData, ContractData,
OrderRequest, CancelRequest, SubscribeRequest, HistoryRequest, LogData
)
from vnpy.trader.constant import Direction, Offset, Exchange, OrderType, Status, Interval
from vnpy.gateway.ctp import CtpGateway # 引入CTP网关
# --- 全局变量和配置 ---
# Redis 客户端实例 (根据你的配置修改)
try:
redis_client = redis.Redis(host='localhost', port=6379, db=5, decode_responses=True)
redis_client.ping() # 测试连接
print("Redis 连接成功")
except Exception as e:
print(f"Redis 连接失败: {e}. 请确保Redis服务正在运行且配置正确。")
redis_client = None # 设置为None,后续逻辑会处理
# CTP 账户配置文件名
CTP_CONNECT_FILE = "connect_ctp.json"
# --- 1. 行情处理与Redis存储模块 ---
class RedisMarketHandler:
"""
处理行情数据,合成K线,并存入Redis
改编自 ctp_market_api_fun.py
"""
def __init__(self, redis_cli: redis.Redis, event_engine: EventEngine):
self.redis_client = redis_cli
self.event_engine = event_engine
self.bar_m1: Dict[str, BarData] = {} # vt_symbol -> BarData
self.last_tick_volume: Dict[str, int] = {} # vt_symbol -> Volume
self.contracts: Dict[str, ContractData] = {} # vt_symbol -> ContractData
self.active_symbols = set() # 记录已订阅的合约
# K线超时闭合定时器 (每15秒检查一次)
self._timer_count = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
print("Redis行情处理器初始化完成")
def update_contract(self, contract: ContractData):
"""更新合约信息"""
self.contracts[contract.vt_symbol] = contract
# print(f"更新合约信息: {contract.vt_symbol}")
def subscribe(self, vt_symbol: str):
"""记录需要处理行情的合约"""
self.active_symbols.add(vt_symbol)
def process_tick_event(self, event: Event):
"""处理 vn.py 推送的 Tick 事件"""
tick: TickData = event.data
if not self.redis_client or tick.vt_symbol not in self.active_symbols:
return
# print(f"收到 Tick: {tick.vt_symbol}, Price: {tick.last_price}, Time: {tick.datetime}")
# 1. 计算成交量增量
last_volume = self.last_tick_volume.get(tick.vt_symbol, 0)
delta_volume = tick.volume - last_volume
# 处理CTP开盘或重连时成交量突变/重置的情况
if delta_volume < 0 or last_volume == 0:
delta_volume = tick.volume if last_volume == 0 else 0 # 首次tick算全部,后续重置算0或特殊处理
self.last_tick_volume[tick.vt_symbol] = tick.volume
# 2. 存储最新 Tick 数据到 Redis
tick_key = f"{tick.vt_symbol.upper()}_tick"
# 转换为字典存储 (eval在原始代码中使用,这里用更安全的方式)
# 注意:vn.py的TickData.datetime已经是校准过的本地时间datetime对象
tick_dict = {
'交易日': tick.datetime.strftime("%Y%m%d"), # 或使用tick内字段,vnpy tick可能没有trading_day
'日期': tick.datetime.strftime("%Y-%m-%d"),
'合约代码': tick.symbol,
'vt_symbol': tick.vt_symbol,
'品种代码': re.sub(r'[^a-zA-Z]', '', tick.symbol),
'最新价': tick.last_price,
'数量': tick.volume,
'持仓量': tick.open_interest,
'涨停板价': tick.limit_up,
'跌停板价': tick.limit_down,
'最后修改时间': tick.datetime.strftime("%H:%M:%S"),
'最后修改毫秒': tick.datetime.microsecond // 1000,
'申买价一': tick.bid_price_1,
'申买量一': tick.bid_volume_1,
'申卖价一': tick.ask_price_1,
'申卖量一': tick.ask_volume_1,
'日期时间': tick.datetime.strftime("%Y-%m-%d %H:%M:%S"),
'vol': delta_volume # 存储计算出的增量成交量
}
try:
# 使用 HSET 存储字典,比存储字符串更灵活
self.redis_client.hset(tick_key, mapping=tick_dict)
# print(f"Redis 更新 Tick HASH: {tick_key}")
except Exception as e:
print(f"写入Redis Tick HASH失败 ({tick_key}): {e}")
# 3. 更新或生成1分钟 K线 (仅当有实际成交量时)
if delta_volume > 0 and tick.last_price > 0: # 过滤掉集合竞价或无成交量的tick
self._update_bar(tick, delta_volume)
def _update_bar(self, tick: TickData, delta_volume: int):
"""根据Tick更新1分钟K线"""
bar = self.bar_m1.get(tick.vt_symbol)
contract = self.contracts.get(tick.vt_symbol)
if not contract:
# print(f"警告: 无法找到合约信息 {tick.vt_symbol},跳过K线合成")
return
# K线时间戳,对齐到分钟开始
bar_time = tick.datetime.replace(second=0, microsecond=0)
bar_key_new = f"{tick.vt_symbol.upper()}_1m_new"
bar_key_close = f"{tick.vt_symbol.upper()}_1m_close"
# 计算成交额增量
tick_turnover = delta_volume * tick.last_price * contract.size
if not bar:
# 创建新Bar
bar = BarData(
gateway_name=tick.gateway_name,
symbol=tick.symbol,
exchange=tick.exchange,
datetime=bar_time,
interval=Interval.MINUTE,
volume=delta_volume,
turnover=tick_turnover,
open_interest=tick.open_interest,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price
)
self.bar_m1[tick.vt_symbol] = bar
# print(f"创建新Bar: {tick.vt_symbol} @ {bar_time}")
elif bar.datetime == bar_time:
# 更新当前Bar
bar.high_price = max(bar.high_price, tick.last_price)
bar.low_price = min(bar.low_price, tick.last_price)
bar.close_price = tick.last_price
bar.volume += delta_volume
bar.turnover += tick_turnover
bar.open_interest = tick.open_interest
# print(f"更新Bar: {tick.vt_symbol} @ {bar_time}")
else:
# 分钟切换,旧Bar结束
# 1. 保存旧Bar到Redis Close List
closed_bar_list = [
bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
bar.open_price, bar.high_price, bar.low_price, bar.close_price,
bar.volume, bar.turnover, bar.open_interest
]
try:
self.redis_client.rpush(bar_key_close, str(closed_bar_list)) # 列表转字符串存储
# print(f"Redis 关闭 K线 List: {bar_key_close} @ {bar.datetime}")
except Exception as e:
print(f"写入Redis K线List失败 ({bar_key_close}): {e}")
# 2. 创建新Bar
bar = BarData(
gateway_name=tick.gateway_name,
symbol=tick.symbol,
exchange=tick.exchange,
datetime=bar_time,
interval=Interval.MINUTE,
volume=delta_volume,
turnover=tick_turnover,
open_interest=tick.open_interest,
open_price=tick.last_price,
high_price=tick.last_price,
low_price=tick.last_price,
close_price=tick.last_price
)
self.bar_m1[tick.vt_symbol] = bar
# print(f"分钟切换,创建新Bar: {tick.vt_symbol} @ {bar_time}")
# 3. 更新Redis中的实时Bar ('_1m_new')
# 存储为 HASH 更易于读取特定字段
new_bar_dict = {
"trade_time": bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
"open": bar.open_price,
"high": bar.high_price,
"low": bar.low_price,
"close": bar.close_price,
"volume": bar.volume,
"turnover": bar.turnover,
"open_interest": bar.open_interest,
}
try:
self.redis_client.hset(bar_key_new, mapping=new_bar_dict)
# print(f"Redis 更新实时 K线 HASH: {bar_key_new}")
except Exception as e:
print(f"写入Redis实时K线 HASH失败 ({bar_key_new}): {e}")
def process_timer_event(self, event: Event):
"""处理定时器事件,用于超时闭合K线"""
self._timer_count += 1
if self._timer_count < 15: # 每 15 * 1秒 = 15秒 检查一次
return
self._timer_count = 0 # 重置计数器
if not self.redis_client:
return
now = datetime.now()
# print(f"定时检查K线超时: {now}")
symbols_to_clear = []
for vt_symbol, bar in self.bar_m1.items():
# 如果bar的最后更新时间(用bar.datetime近似)距离现在超过一定时间(例如90秒)
# 并且当前时间不在交易时段内(这个判断比较复杂,暂时简化为只看时间差)
if (now - bar.datetime) > timedelta(seconds=90):
print(f"检测到超时K线,准备闭合: {vt_symbol} @ {bar.datetime}")
bar_key_close = f"{vt_symbol.upper()}_1m_close"
closed_bar_list = [
bar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
bar.open_price, bar.high_price, bar.low_price, bar.close_price,
bar.volume, bar.turnover, bar.open_interest
]
try:
self.redis_client.rpush(bar_key_close, str(closed_bar_list))
print(f"Redis 超时关闭 K线 List: {bar_key_close}")
symbols_to_clear.append(vt_symbol)
except Exception as e:
print(f"Redis超时关闭K线失败 ({bar_key_close}): {e}")
# 清理内存中已闭合的Bar
for vt_symbol in symbols_to_clear:
if vt_symbol in self.bar_m1:
del self.bar_m1[vt_symbol]
print(f"已从内存清理超时K线: {vt_symbol}")
# --- 2. 智能下单映射模块 ---
class SmartOrderPlacer:
"""
根据持仓和规则决定开平仓标志,并发送订单
改编自 ctp_function_交易映射.py
"""
def __init__(self, main_engine: MainEngine, redis_cli: Optional[redis.Redis]):
self.main_engine = main_engine
self.redis_client = redis_cli
self.contracts: Dict[str, ContractData] = {}
# 需要区分平今的交易所列表
self.shfe_ine_exchanges = {Exchange.SHFE, Exchange.INE}
# 股指期货等需要锁仓的品种前缀 (根据实际需要调整)
self.lock_codes = {"IF", "IC", "IM", "IH"}
print("智能下单处理器初始化完成")
def update_contract(self, contract: ContractData):
"""更新合约信息"""
self.contracts[contract.vt_symbol] = contract
def get_latest_price(self, vt_symbol: str) -> Optional[Tuple[float, float]]:
"""获取最新买一卖一价 (优先从vn.py获取,失败则尝试Redis)"""
tick = self.main_engine.get_tick(vt_symbol)
if tick and tick.bid_price_1 > 0 and tick.ask_price_1 > 0:
return tick.bid_price_1, tick.ask_price_1
elif self.redis_client:
# 尝试从Redis获取 (假设Redis里存了HASH)
tick_key = f"{vt_symbol.upper()}_tick"
try:
tick_dict = self.redis_client.hgetall(tick_key)
bid_price = float(tick_dict.get('申买价一', 0))
ask_price = float(tick_dict.get('申卖价一', 0))
if bid_price > 0 and ask_price > 0:
return bid_price, ask_price
except Exception as e:
print(f"从Redis获取价格失败 ({tick_key}): {e}")
return None
return None
def place_smart_order(
self,
vt_symbol: str,
direction: Direction,
volume: float,
order_type: OrderType = OrderType.LIMIT, # 默认限价单
lock: bool = False, # 是否强制锁仓(暂未使用)
extra_price_ticks: int = 5 # 超价tick数
) -> Optional[str]: # 返回vn.py的订单号 vt_orderid
"""
智能下单主函数
Args:
vt_symbol (str): 合约代码.交易所
direction (Direction): 方向 (买/卖)
volume (float): 数量
order_type (OrderType): 订单类型 (LIMIT, MARKET, etc.)
lock (bool): 是否强制锁仓 (目前影响不大,由品种决定)
extra_price_ticks (int): 限价单超价的tick数
Returns:
Optional[str]: vn.py格式的订单ID (vt_orderid),如果下单失败则返回None
"""
contract = self.contracts.get(vt_symbol)
if not contract:
print(f"错误: 找不到合约信息 {vt_symbol}")
return None
position = self.main_engine.get_position(vt_symbol) # vn.py 会自动合并多空头寸到一个对象
if not position: # 如果没有持仓记录,创建一个空的
position = PositionData(gateway_name=".", symbol=contract.symbol, exchange=contract.exchange)
price_tuple = self.get_latest_price(vt_symbol)
if not price_tuple:
print(f"错误: 无法获取 {vt_symbol} 的最新买卖价")
return None
bid_price, ask_price = price_tuple
# 1. 确定价格
order_price = 0.0
if order_type == OrderType.LIMIT:
if direction == Direction.LONG: # 买入
order_price = ask_price + contract.pricetick * extra_price_ticks
else: # 卖出
order_price = bid_price - contract.pricetick * extra_price_ticks
# 价格需要对齐到最小变动价位
order_price = round(round(order_price / contract.pricetick) * contract.pricetick, 10)
# 防止价格超出涨跌停
order_price = max(order_price, contract.lower_limit)
order_price = min(order_price, contract.upper_limit)
if order_price <= 0:
print(f"错误: 计算出的限价单价格异常 {order_price} for {vt_symbol}")
return None
# 2. 确定开平标志 (核心逻辑)
offset = Offset.OPEN # 默认开仓
product_code = re.sub(r'[^a-zA-Z]', '', contract.symbol).upper()
is_lock_product = product_code in self.lock_codes
is_shfe_ine = contract.exchange in self.shfe_ine_exchanges
if direction == Direction.LONG: # 计划买入
# 如果持有空仓,则需要平仓
if position.short_pos > 0:
# 对于上期/能源所,优先平今
if is_shfe_ine and position.short_td >= volume:
offset = Offset.CLOSETODAY
# 否则,平昨仓 (vn.py的Offset.CLOSE代表平昨优先)
else:
offset = Offset.CLOSE
# 注意:这里简化了逻辑,没有处理平昨不够再平今的情况
# vn.py CTP网关通常会自动处理Close请求(先平昨再平今),除非交易所严格要求
# 如果需要严格控制,需要拆分订单,先下Close平昨,再下CloseToday平今
else: # 计划卖出
# 如果持有多仓,则需要平仓
if position.long_pos > 0:
# 股指类产品优先锁仓(即开反向仓)
if is_lock_product and position.long_td > 0: # 只有日内持仓才倾向于锁仓
offset = Offset.OPEN # 通过开反向仓实现锁仓
# 对于上期/能源所,优先平今
elif is_shfe_ine and position.long_td >= volume:
offset = Offset.CLOSETODAY
# 否则,平昨仓
else:
offset = Offset.CLOSE
# 同样,简化了先平昨再平今的拆单逻辑
# 如果经过判断仍然是开仓,并且目标方向已有持仓,打印提示(可能重复开仓)
if offset == Offset.OPEN:
if (direction == Direction.LONG and position.long_pos > 0) or \
(direction == Direction.SHORT and position.short_pos > 0):
print(f"提示: {vt_symbol} {direction.value} {offset.value},但已有同向持仓 {position.long_pos if direction == Direction.LONG else position.short_pos}")
# 3. 构建订单请求
req = OrderRequest(
symbol=contract.symbol,
exchange=contract.exchange,
direction=direction,
offset=offset,
type=order_type,
volume=volume,
price=order_price,
reference=f"smart_{int(time.time()*1000)}" # 增加唯一性
)
# 4. 发送订单
vt_orderid = self.main_engine.send_order(req, contract.gateway_name)
if vt_orderid:
print(f"智能下单成功: {vt_orderid} | {vt_symbol} {direction.value} {volume} @ {order_price if order_price > 0 else '市价'} | 开平: {offset.value}")
return vt_orderid
else:
print(f"智能下单失败: {vt_symbol} {direction.value} {volume}")
return None
def rollover_and_unlock(self):
"""执行换月和解锁逻辑 (待实现)"""
print("换月解锁功能暂未实现")
# 实现逻辑:
# 1. 获取所有持仓 main_engine.get_all_positions()
# 2. 对比持仓合约与主力合约字典 (需要外部提供)
# 3. 如果持有非主力合约,生成平非主力、开主力的OrderRequest并发送
# 4. 检查是否存在锁仓情况 (多空持仓同时存在且昨仓>0)
# 5. 如果需要解锁,生成对锁平仓的OrderRequest并发送 (注意区分平今/平昨)
pass
# --- 3. 主应用模块 ---
class MyTradingApp:
"""
vn.py 应用模块,整合行情处理、下单映射和交易流程
"""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
self.main_engine = main_engine
self.event_engine = event_engine
self.gateway_name = "CTP" # 与 connect_ctp.json 中的 gatewayName 一致
# 初始化 Redis 处理器和下单器
self.redis_handler = None
if redis_client:
self.redis_handler = RedisMarketHandler(redis_client, self.event_engine)
else:
print("警告: Redis未连接,行情存储和部分功能将不可用。")
self.order_placer = SmartOrderPlacer(self.main_engine, redis_client)
self.subscribed_symbols = set() # 记录已通过vn.py订阅的合约
print("MyTradingApp 初始化完成")
def start(self):
"""启动应用,注册事件监听"""
self.register_event_handlers()
print("事件监听器注册完成")
# 连接网关(如果尚未连接)
self.connect_gateway()
def connect_gateway(self):
"""加载并连接CTP网关"""
try:
self.main_engine.add_gateway(CtpGateway)
print(f"添加网关 {self.gateway_name} 成功")
except Exception as e:
print(f"添加网关 {self.gateway_name} 失败: {e}") # 可能已存在
# 使用配置文件连接
self.main_engine.connect(CTP_CONNECT_FILE, self.gateway_name)
print(f"尝试连接网关 {self.gateway_name} 使用配置文件 {CTP_CONNECT_FILE}...")
# 需要等待连接成功的回调或轮询状态
def register_event_handlers(self):
"""注册需要监听的事件"""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(EVENT_LOG, self.process_log_event)
# --- 事件处理回调 ---
def process_tick_event(self, event: Event):
"""处理行情事件,分发给 Redis 处理器"""
if self.redis_handler:
self.redis_handler.process_tick_event(event)
def process_order_event(self, event: Event):
"""处理订单回报事件"""
order: OrderData = event.data
# 可以根据需要添加逻辑,例如更新UI、记录日志等
# print(f"订单更新: {order.vt_orderid} | Status: {order.status.value} | Filled: {order.traded}/{order.volume}")
pass
def process_trade_event(self, event: Event):
"""处理成交回报事件"""
trade: TradeData = event.data
print(f"*** 成交通知 ***: {trade.vt_orderid} | {trade.direction.value} {trade.offset.value} | {trade.volume} @ {trade.price}")
# 成交后可以触发其他逻辑,如止盈止损计算等
pass
def process_position_event(self, event: Event):
"""处理持仓更新事件"""
position: PositionData = event.data
# vn.py网关内部会维护持仓,这里主要是为了日志或触发特定逻辑
# print(f"持仓更新: {position.vt_symbol} | Long: {position.long_pos} (TD:{position.long_td}, YD:{position.long_yd}) | Short: {position.short_pos} (TD:{position.short_td}, YD:{position.short_yd})")
pass
def process_account_event(self, event: Event):
"""处理账户资金事件"""
account: AccountData = event.data
# print(f"账户更新: {account.accountid} | Balance: {account.balance:.2f} | Available: {account.available:.2f}")
pass
def process_contract_event(self, event: Event):
"""处理合约信息事件,更新内部存储"""
contract: ContractData = event.data
if self.redis_handler:
self.redis_handler.update_contract(contract)
self.order_placer.update_contract(contract)
def process_log_event(self, event: Event):
"""处理日志事件"""
log: LogData = event.data
# 避免打印过多行情日志,可以过滤
if "onRtnDepthMarketData" not in log.msg and "查询行情快照频率" not in log.msg :
print(f"日志: [{log.level.name}] {log.msg}")
# --- 对外交易接口 ---
def subscribe(self, symbols: list):
"""订阅行情"""
gateway = self.main_engine.get_gateway(self.gateway_name)
if not gateway or not gateway.active:
print(f"错误: 网关 {self.gateway_name} 未连接或未激活")
return
print(f"尝试订阅行情: {symbols}")
for symbol in symbols:
# 获取合约信息,重点是交易所
contract = self.main_engine.get_contract(symbol) # 这里symbol可能是简写如 ag2412
if not contract:
print(f"警告: 无法找到合约 {symbol} 的信息,无法订阅")
continue
vt_symbol = contract.vt_symbol # 获取完整的 vt_symbol
req = SubscribeRequest(symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, gateway.gateway_name)
self.subscribed_symbols.add(vt_symbol)
if self.redis_handler:
self.redis_handler.subscribe(vt_symbol) # 通知Redis处理器也关注这个合约
print(f"已发送 {vt_symbol} 的订阅请求")
time.sleep(0.1) # 防止订阅过快
def buy(self, vt_symbol: str, volume: float, price: float = 0.0, order_type: OrderType = OrderType.LIMIT, **kwargs) -> Optional[str]:
"""简化买入接口"""
# 如果price > 0, 优先使用传入价格构建限价单请求
if price > 0 and order_type == OrderType.LIMIT:
contract = self.contracts.get(vt_symbol)
if not contract:
print(f"错误: 找不到合约信息 {vt_symbol}")
return None
req = OrderRequest(
symbol=contract.symbol,
exchange=contract.exchange,
direction=Direction.LONG,
offset=Offset.OPEN, # 简化接口先默认为开仓,智能下单会判断
type=OrderType.LIMIT,
volume=volume,
price=price,
reference=f"simple_buy_{int(time.time()*1000)}"
)
# 注意:这里构建了简单的OrderRequest,但仍应通过SmartOrderPlacer处理开平
# return self.main_engine.send_order(req, contract.gateway_name)
# **修正:应调用智能下单器**
print("提示: buy接口调用智能下单器进行开平判断...")
return self.order_placer.place_smart_order(vt_symbol, Direction.LONG, volume, order_type=order_type)
else: # 否则,使用智能下单(传入价格0或市价单类型)
return self.order_placer.place_smart_order(vt_symbol, Direction.LONG, volume, order_type=order_type)
def sell(self, vt_symbol: str, volume: float, price: float = 0.0, order_type: OrderType = OrderType.LIMIT, **kwargs) -> Optional[str]:
"""简化卖出接口(通常用于平多仓或开空仓)"""
# 同样,优先使用传入价格构建限价单
if price > 0 and order_type == OrderType.LIMIT:
contract = self.contracts.get(vt_symbol)
if not contract:
print(f"错误: 找不到合约信息 {vt_symbol}")
return None
# **修正:应调用智能下单器**
print("提示: sell接口调用智能下单器进行开平判断...")
return self.order_placer.place_smart_order(vt_symbol, Direction.SHORT, volume, order_type=order_type)
else:
return self.order_placer.place_smart_order(vt_symbol, Direction.SHORT, volume, order_type=order_type)
def cover(self, vt_symbol: str, volume: float, price: float = 0.0, order_type: OrderType = OrderType.LIMIT, **kwargs) -> Optional[str]:
"""简化买平接口(通常用于平空仓)"""
# **修正:应调用智能下单器**
print("提示: cover接口调用智能下单器进行开平判断...")
return self.order_placer.place_smart_order(vt_symbol, Direction.LONG, volume, order_type=order_type)
def stop(self):
"""停止应用"""
self.main_engine.close()
print("应用已停止")
# --- 主程序入口 ---
def main():
"""主函数"""
# 创建事件引擎和主引擎
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
# 创建并启动交易应用
trading_app = MyTradingApp(main_engine, event_engine)
trading_app.start() # 会连接网关并注册事件
print("等待CTP网关连接成功...")
# 需要等待网关连接成功,这里简单等待几秒
# 正式应用需要检查网关状态或等待特定事件
time.sleep(10)
# 获取 CTP 网关对象
gateway = main_engine.get_gateway(trading_app.gateway_name)
if not gateway or not gateway.active:
print(f"网关 {trading_app.gateway_name} 连接失败,请检查配置和网络。程序退出。")
return
print(f"网关 {trading_app.gateway_name} 连接成功。")
# 订阅行情示例 (从配置文件或列表读取)
# 注意:symbol应该是合约代码,如 ag2412, IF2412
symbols_to_subscribe = ["ag2412", "au2412", "IF2412"] # 示例合约
trading_app.subscribe(symbols_to_subscribe)
print("行情订阅已发送,等待行情数据...")
time.sleep(5) # 等待合约信息和初始行情
# --- 交易指令示例 ---
vt_symbol_ag = "ag2412.SHFE" # 使用 vn.py 的 vt_symbol 格式
vt_symbol_if = "IF2412.CFFEX"
# 示例1: 使用智能下单买入开仓 ag 1手 (自动判断开平)
print("\n--- 示例1: 智能买入开仓 ag ---")
trading_app.buy(vt_symbol_ag, 1)
time.sleep(2) # 等待订单回报
# 示例2: 再次智能买入 ag 1手 (如果上一单成交,这次还是开仓)
print("\n--- 示例2: 再次智能买入 ag ---")
trading_app.buy(vt_symbol_ag, 1)
time.sleep(2)
# 示例3: 智能卖出平仓 ag 1手 (假设之前有买入持仓)
print("\n--- 示例3: 智能卖出平仓 ag ---")
trading_app.sell(vt_symbol_ag, 1)
time.sleep(2)
# 示例4: 智能卖出开仓 IF 1手
print("\n--- 示例4: 智能卖出开仓 IF ---")
trading_app.sell(vt_symbol_if, 1)
time.sleep(2)
# 示例5: 智能买入平仓 IF 1手 (股指,假设之前有卖出持仓)
print("\n--- 示例5: 智能买入平仓 IF ---")
trading_app.buy(vt_symbol_if, 1)
time.sleep(2)
# 阻塞主线程,使事件引擎可以继续运行
print("\n按 CTRL+C 退出程序")
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
print("收到退出信号...")
break
# 关闭
trading_app.stop()
if __name__ == "__main__":
main()
代码解释:
-
Redis 连接与配置:
- 代码首先尝试连接 Redis 服务器。如果失败,会打印警告,
RedisMarketHandler的相关功能将不可用。 CTP_CONNECT_FILE定义了 CTP 账户的 JSON 配置文件名。
- 代码首先尝试连接 Redis 服务器。如果失败,会打印警告,
-
RedisMarketHandler类:- 初始化 (
__init__): 接收 Redis 客户端和 vn.py 事件引擎实例。初始化用于存储K线、最新成交量和合约信息的字典。注册定时器事件处理函数。 update_contract: 由MyTradingApp调用,用于接收并存储 vn.py 推送的合约详细信息(乘数、最小变动价位等)。subscribe: 记录需要处理行情的vt_symbol。process_tick_event:- 这是核心的行情处理函数,由事件引擎在收到
EVENT_TICK时调用。 - 计算增量成交量 (
delta_volume),并处理开盘/重连时成交量可能重置的问题。 - 将处理后的 Tick 信息(包含增量成交量
vol)以 HASH 格式存入 Redis(键格式:VT_SYMBOL_tick)。使用 HASH 比存储字符串更方便读取特定字段。 - 如果
delta_volume > 0且价格有效,调用_update_bar处理K线。
- 这是核心的行情处理函数,由事件引擎在收到
_update_bar:- 根据 Tick 数据和合约信息(乘数)计算成交额。
- 判断当前 Tick 属于哪个1分钟 K线。
- 创建新 Bar: 如果是该合约的第一个 Tick 或新的一分钟开始。
- 更新当前 Bar: 如果 Tick 属于正在进行的 K线。
- 闭合旧 Bar: 当分钟切换时,将完成的 K线数据(列表转字符串)存入 Redis 的 List 中(键格式:
VT_SYMBOL_1m_close),rpush实现追加。 - 更新实时 Bar: 将当前正在形成的 K线数据(无论新建还是更新)以 HASH 格式写入 Redis(键格式:
VT_SYMBOL_1m_new),hset实现覆盖更新。
process_timer_event: 定期(每15秒)检查bar_m1中的 K线,如果某个 K线 的时间戳距离当前时间过长(例如90秒),则认为该 K线 超时未更新(可能收盘或休市),将其数据存入 Redis 的_1m_close列表,并从内存bar_m1中移除。
- 初始化 (
-
SmartOrderPlacer类:- 初始化 (
__init__): 接收MainEngine(用于发单、查行情、查持仓、查合约)和 Redis 客户端实例。 update_contract: 存储合约信息,特别是交易所信息(用于判断平今规则)、pricetick和size。get_latest_price: 优先从main_engine.get_tick获取最新 Tick 的买一卖一价。如果失败,则尝试从 Redis 读取之前RedisMarketHandler存入的 Tick HASH 数据。place_smart_order:- 获取信息: 获取目标合约
ContractData、当前PositionData、最新买卖价。 - 计算价格: 对限价单计算超价,并确保价格在涨跌停内且符合最小变动价位。
- 判断开平 (
Offset): 这是核心逻辑:- 默认
Offset.OPEN。 - 检查反向持仓是否存在 (
position.short_pos或position.long_pos)。 - 如果存在反向持仓:
- 检查是否为股指期货等需要锁仓的品种 (
is_lock_product) 且有日内反向持仓,如果是,则保持Offset.OPEN(开反向仓锁仓)。 - 检查是否为上期所/能源中心合约 (
is_shfe_ine) 且今日反向持仓 (position.short_td或position.long_td) 足够平掉本次下单量,如果是,则设置Offset.CLOSETODAY。 - 其他情况(包括平昨仓或交易所无特殊要求),设置
Offset.CLOSE(vn.py 的 CTP 网关通常会智能处理 Close:先平昨再平今)。
- 检查是否为股指期货等需要锁仓的品种 (
- 注意: 此处简化了需要严格区分平昨、平今并拆分订单的复杂情况。
- 默认
- 构建
OrderRequest: 使用 vn.py 的OrderRequest对象,填入计算好的价格、判断好的Offset等信息。 - 发送订单: 调用
main_engine.send_order发送订单请求。 - 返回 vn.py 生成的
vt_orderid或None。
- 获取信息: 获取目标合约
rollover_and_unlock: 提供了接口,但具体逻辑需要根据实际需求实现。
- 初始化 (
-
MyTradingApp类:- 初始化 (
__init__): 创建MainEngine和EventEngine的引用,初始化RedisMarketHandler(如果 Redis 可用) 和SmartOrderPlacer。 start: 注册各类事件的处理函数,并调用connect_gateway。connect_gateway: 添加CtpGateway到MainEngine并使用配置文件发起连接。register_event_handlers: 将 vn.py 的各种事件(Tick, Order, Trade, Position, Account, Contract, Log)绑定到相应的process_..._event方法。process_..._event方法:process_tick_event: 将 Tick 事件转发给redis_handler处理。process_contract_event: 将合约信息分别更新到redis_handler和order_placer。- 其他事件处理函数目前主要用于打印日志或作为未来扩展的入口。vn.py 网关本身会处理大部分状态更新。
subscribe: 接收合约代码列表(如ag2412),查找对应的ContractData,构建SubscribeRequest,并通过main_engine.subscribe发送订阅请求。同时通知redis_handler开始处理该合约的行情。buy/sell/cover: 提供给上层策略调用的简化接口。它们内部都调用order_placer.place_smart_order来执行下单,从而利用其智能判断开平的功能。
- 初始化 (
-
main函数:- 创建
MainEngine和EventEngine。 - 创建
MyTradingApp实例并启动。 - 等待网关连接成功。
- 调用
trading_app.subscribe订阅所需合约的行情。 - 包含了一些调用
trading_app.buy/sell的示例,演示如何触发智能下单。 - 最后进入一个循环阻塞主线程,允许事件引擎处理事件,直到用户按
CTRL+C退出。 - 退出前调用
trading_app.stop()关闭主引擎。
- 创建
这个框架将原始代码的功能模块化,并利用了 vn.py 的事件驱动机制和内置的 CTP 网关功能,使得结构更清晰,更容易扩展。行情处理和下单决策的核心逻辑得以保留和适配。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)