miniQMT安装包路径 | 下载地址
安装好QMT之后,在QMT的设置里面选择安装python。
等待几分钟,python文件下载好了之后。
找到qmt的安装目录,
进去这里面
\bin.x64\Lib\site-packages\xtquant
把这个目录复制到你的python路径的site-package 下面, 就可以在你的python环境下运行miniQMT了。
当然首先还是要启动你的QMT客户端,勾选极速模式。 (不开的话连不到券商服务器,这以为这无法再linux上单独跑,wine额外另说)
比如下面的获取行情的示例代码,还有直接下单代码
异步下单
需要开通miniQMT(低门槛,低费率)的朋友,
可以扫码联系:或者添加 技术公众号:
公众号:
等待几分钟,python文件下载好了之后。
找到qmt的安装目录,
进去这里面
\bin.x64\Lib\site-packages\xtquant
把这个目录复制到你的python路径的site-package 下面, 就可以在你的python环境下运行miniQMT了。
当然首先还是要启动你的QMT客户端,勾选极速模式。 (不开的话连不到券商服务器,这以为这无法再linux上单独跑,wine额外另说)
比如下面的获取行情的示例代码,还有直接下单代码
# 用前须知
## xtdata提供和MiniQmt的交互接口,本质是和MiniQmt建立连接,由MiniQmt处理行情数据请求,再把结果回传返回到python层。使用的行情服务器以及能获取到的行情数据和MiniQmt是一致的,要检查数据或者切换连接时直接操作MiniQmt即可。
## 对于数据获取接口,使用时需要先确保MiniQmt已有所需要的数据,如果不足可以通过补充数据接口补充,再调用数据获取接口获取。
## 对于订阅接口,直接设置数据回调,数据到来时会由回调返回。订阅接收到的数据一般会保存下来,同种数据不需要再单独补充。
# 代码讲解
# 从本地python导入xtquant库,如果出现报错则说明安装失败
from xtquant import xtdata
import time
# 设定一个标的列表
code_list = ["000001.SZ"]
# 设定获取数据的周期
period = "1d"
# 下载标的行情数据
if 1:
## 为了方便用户进行数据管理,xtquant的大部分历史数据都是以压缩形式存储在本地的
## 比如行情数据,需要通过download_history_data下载,财务数据需要通过
## 所以在取历史数据之前,我们需要调用数据下载接口,将数据下载到本地
for i in code_list:
xtdata.download_history_data(i,period=period,incrementally=True) # 增量下载行情数据(开高低收,等等)到本地
xtdata.download_financial_data(code_list) # 下载财务数据到本地
xtdata.download_sector_data() # 下载板块数据到本地
# 更多数据的下载方式可以通过数据字典查询
# 读取本地历史行情数据
history_data = xtdata.get_market_data_ex(,code_list,period=period,count=-1)
print(history_data)
print("=" * 20)
# 如果需要盘中的实时行情,需要向服务器进行订阅后才能获取
# 订阅后,get_market_data函数于get_market_data_ex函数将会自动拼接本地历史行情与服务器实时行情
# 向服务器订阅数据
for i in code_list:
xtdata.subscribe_quote(i,period=period,count=-1) # 设置count = -1来取到当天所有实时行情
# 等待订阅完成
time.sleep(1)
# 获取订阅后的行情
kline_data = xtdata.get_market_data_ex(,code_list,period=period)
print(kline_data)
# 获取订阅后的行情,并以固定间隔进行刷新,预期会循环打印10次
for i in range(10):
# 这边做演示,就用for来循环了,实际使用中可以用while True
kline_data = xtdata.get_market_data_ex(,code_list,period=period)
print(kline_data)
time.sleep(3) # 三秒后再次获取行情
# 如果不想用固定间隔触发,可以以用订阅后的回调来执行
# 这种模式下当订阅的callback回调函数将会异步的执行,每当订阅的标的tick发生变化更新,callback回调函数就会被调用一次
# 本地已有的数据不会触发callback
# 定义的回测函数
## 回调函数中,data是本次触发回调的数据,只有一条
def f(data):
# print(data)
code_list = list(data.keys()) # 获取到本次触发的标的代码
kline_in_callabck = xtdata.get_market_data_ex(,code_list,period = period) # 在回调中获取klines数据
print(kline_in_callabck)
for i in code_list:
xtdata.subscribe_quote(i,period=period,count=-1,callback=f) # 订阅时设定回调函数
# 使用回调时,必须要同时使用xtdata.run()来阻塞程序,否则程序运行到最后一行就直接结束退出了。
xtdata.run()
异步下单
#coding:utf-8
import time, datetime, traceback, sys
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
#定义一个类 创建类的实例 作为状态的容器
class _a():
pass
A = _a()
A.bought_list =
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')
def interact():
"""执行后进入repl模式"""
import code
code.InteractiveConsole(locals=globals()).interact()
xtdata.download_sector_data()
def f(data):
now = datetime.datetime.now()
for stock in data:
if stock not in A.hsa:
continue
cuurent_price = data[stock]['lastPrice']
pre_price = data[stock]['lastClose']
ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0
if ratio > 0.09 and stock not in A.bought_list:
print(f"{now} 最新价 买入 {stock} 200股")
async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 200, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock)
A.bought_list.append(stock)
class MyXtQuantTraderCallback(XtQuantTraderCallback):
def on_disconnected(self):
"""
连接断开
:return:
"""
print(datetime.datetime.now(),'连接断开回调')
def on_stock_order(self, order):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
print(datetime.datetime.now(), '委托回调', order.order_remark)
def on_stock_trade(self, trade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
print(datetime.datetime.now(), '成交回调', trade.order_remark)
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
# print("on order_error callback")
# print(order_error.order_id, order_error.error_id, order_error.error_msg)
print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
def on_cancel_error(self, cancel_error):
"""
撤单失败推送
:param cancel_error: XtCancelError 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
def on_order_stock_async_response(self, response):
"""
异步下单回报推送
:param response: XtOrderResponse 对象
:return:
"""
print(f"异步委托回调 {response.order_remark}")
def on_cancel_order_stock_async_response(self, response):
"""
:param response: XtCancelOrderResponse 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
if __name__ == '__main__':
print("start")
#指定客户端所在路径
path = r'D:\qmt\sp3\迅投极速交易终端 睿智融科版\userdata_mini'
# 生成session id 整数类型 同时运行的策略不能重复
session_id = int(time.time())
xt_trader = XtQuantTrader(path, session_id)
# 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定
# 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
# xt_trader.set_relaxed_response_order_enabled(True)
# 创建资金账号为 800068 的证券账号对象
acc = StockAccount('800068', 'STOCK')
# 创建交易回调类对象,并声明接收回调
callback = MyXtQuantTraderCallback()
xt_trader.register_callback(callback)
# 启动交易线程
xt_trader.start()
# 建立交易连接,返回0表示连接成功
connect_result = xt_trader.connect()
print('建立交易连接,返回0表示连接成功', connect_result)
# 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
subscribe_result = xt_trader.subscribe(acc)
print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
#这一行是注册全推回调函数 包括下单判断 安全起见处于注释状态 确认理解效果后再放开
# xtdata.subscribe_whole_quote(["SH", "SZ"], callback=f)
# 阻塞主线程退出
xt_trader.run_forever()
# 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里)
interact()
需要开通miniQMT(低门槛,低费率)的朋友,
可以扫码联系:或者添加 技术公众号:
公众号: