python 调用okex api_OKEX websocket API 连接Python范例
因为websocket-client新版的各种大脑降级设计 很多功能无法使用需要安装老版本websocket-client的包才能正常使用pip3 install websocket-client==0.46.0Python源码:#!/usr/bin/env python3# -*- coding: utf-8 -*-# encoding: utf-8import timeimport ss...
因为 websocket-client 新版的各种大脑降级设计 很多功能无法使用
需要安装老版本websocket-client的包才能正常使用 pip3 install websocket-client==0.46.0
Python源码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# encoding: utf-8
import time
import ssl
import sys
import code
import json
import hashlib
import hmac
import urllib
import threading
import websocket
import zlib
import string
try:
import readline
except ImportError:
pass
pong = time.time()
class WSSubscription:
def __init__(self, instrument_id='BTC-USD-190517', market='futures', on_message=None):
self.__iid = instrument_id
self.__market = market
self.__Depth = {}
if on_message is not None:
self.__callbackEnabled = True
self.__callback = on_message
else:
self.__callbackEnabled = False
thread = threading.Thread(target=self.sub, args=())
thread.daemon = True
thread.start()
def GetDepth(self):
return self.__Depth
def subscribe(self, ws):
def operator(op, args):
message = {
'op': op,
'args': args
}
ws.send(json.dumps(message))
def run(*args):
operator('subscribe', ['%s/depth5:%s' % (self.__market, self.__iid)])
operator('subscribe', ['%s/trade:%s' % (self.__market, self.__iid)])
while True:
ws.send("ping")
time.sleep(30)
threading.Thread(target=run).start()
def sub(self):
websocket.enableTrace(False)
URL = "wss://real.okex.com:10442/ws/v3"
ws = websocket.WebSocketApp(URL,
on_message=self.incoming,
on_error=self.error_handling,
on_close=self.closing)
ws.on_open = self.subscribe
while True:
try:
ws.run_forever()
except:
pass
pass
def incoming(self,ws,message):
message = zlib.decompress(message, -zlib.MAX_WBITS)
message = message.decode('utf-8')
global pong
if 'pong' in message:
pong = time.time()
if 'asks' in message and 'bids' in message:
d = json.loads(message)
self.__Depth = d['data'][0]
if self.__callbackEnabled:
self.__callback(message)
def error_handling(self,ws,error):
print(str(error))
def closing(self,ws):
print("WebSocket Closing...")
ext.OkEXWS = WSSubscription
# 模块测试
def main():
OkEX = ext.OkEXWS('BTC-USD-190517', 'futures')
while (True):
Log(OkEX.GetDepth())
time.sleep(1)
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)