【保姆教程】使用ptrade做一个持仓监控提醒软件 (二)
接下来完成代码实现部分:
主要框架如下:
盘前我们先去读取数据库的数据:
格式很简单,就记录了代码和名字:
df = pd.read_sql('select * from tb_holding_stock_list', con=engine)
然后主要部分在 execution 这个监控函数这里。
send_message_via_wechat 这个函数是发送微信消息的。
然后基本完成了整体的代码编写,里面一些自定义的函数为了判断 下一次通知要等待多久。
因为不能因为同一个股票满足条件了,然后每隔3秒发一次微信消息。你手机会一直滴滴滴地响的。
而且很容易把其他刚出现的提示给覆盖了。
【保姆教程】使用ptrade做一个持仓监控提醒软件 (一)
主要框架如下:
盘前我们先去读取数据库的数据:
格式很简单,就记录了代码和名字:
df = pd.read_sql('select * from tb_holding_stock_list', con=engine)
def initialize(context):
# 初始化策略
engine = DBSelector().get_engine()
df = pd.read_sql('select * from tb_holding_stock_list', con=engine)
df['code']=df['code'].astype(str)
result = {}
for index, row in df.iterrows():
code = add_code_postfix(row['code'])
result[code] = {'name': row['name'], 'source': row['source']}
g.holding_stock_dict = result
g.holding_stock_list = list(result.keys())
g.__cache = Cache()
run_interval(context, execution, seconds=INTERVAL) # 扫描
def handle_data(context, data):
pass
def tick_data(context, data):
pass
def before_trading_start(context, data):
'''
盘前
'''
if DEBUG:
log.info('盘前运行开始', str(context.blotter.current_dt))
def after_trading_end(context, data):
'''
盘后
'''
if DEBUG:
log.info('盘后时间 ', str(context.blotter.current_dt))
然后主要部分在 execution 这个监控函数这里。
def execution(context):
tick_info = get_snapshot(g.holding_stock_list)
for code, tick in tick_info.items():
px_change_rate = tick['px_change_rate']
if px_change_rate > abs(HIT_TARGET):
if g.__cache.check(code):
# 通知
name = g.holding_stock_dict.get(code)['name']
source = g.holding_stock_dict.get(code)['source']
msg = '{}-{} 涨幅-{},{}'.format(code,
name, px_change_rate, source)
send_message_via_wechat(msg)
send_message_via_wechat 这个函数是发送微信消息的。
然后基本完成了整体的代码编写,里面一些自定义的函数为了判断 下一次通知要等待多久。
因为不能因为同一个股票满足条件了,然后每隔3秒发一次微信消息。你手机会一直滴滴滴地响的。
而且很容易把其他刚出现的提示给覆盖了。
【保姆教程】使用ptrade做一个持仓监控提醒软件 (一)