实盘量化QMT踩坑记录(三)


对之前的小市值策略修正了几个Bug,并增加了RSRS-MV择时。同时最近又发现了QMT的如下几个问题:

  1. 有时候会出现pandas库找不到等问题,重启一下QMT即可。

  2. 似乎对于get_market_data_ex()等函数,即便事先不先下载数据,也会先自动去下载,所以不下载数据也能跑。

  3. QMT似乎不适合做回测。上篇,已经提到过,QMT在回测的时候,一些函数不会动态地修改“今天”,因此,如果某个函数有end_time参数,默认的话都是返回当前的时间节点,并不会在回测模式下动态的进行一个调整。因此,在回测时候,凡是有end_time参数的,我都要根据ContextInfo.barpos 来获取时间,然后特别的指定end_time。但是对于一些不接受end_time参数的函数,如get_instrumentdetail(),我们就无从知晓其返回的是当前信息呢,还是回测过程中当时的信息。我在迅投文档上面也找不到相关信息,从这个角度来说,QMT似乎不怎么适合做回测。

以下是我使用的一个成长小市值策略回测版本的代码,加了RSRS-MV择时,修复了之前的几个bug。如果你有其他问题,也可以联系我

#coding:gbk
"""
成长小市值RSRS-MV择时策略-回测
"""
import pandas as pd
import numpy as np
import time
import statsmodels.api as sm
from datetime import timedelta, date

class A():
    pass
g = A() #创建空的类的实例 用来保存委托状态 

def init(ContextInfo):
    g.acct = 'testS'
    ContextInfo.set_account(g.acct)
    g.acct_type = 'STOCK'
    g.buy_code = 23 
    g.sell_code = 24
    g.buy_stock_count = 5
    g.ipo_days = 180
    g.N = 18
    g.M = 600
    g.ref_stock = '000300.SH' # 沪深300
    g.score_threshold = 0.7
    g.R2_l = []
    g.beta_l = []
    g.rsrs = 'KEEP'
    today = date.today()
    yesterday = today - timedelta(days=1)
    yesterday = yesterday.strftime('%Y%m%d')
    g.yesterday = yesterday
    initial_slope_series(ContextInfo)
    # print('In init, now', end_time)
    # ContextInfo.run_time('trade', '30nSecond', '2013-12-25 14:30:00')

def initial_slope_series(ContextInfo):
    HS300 = ContextInfo.get_market_data_ex(['high', 'low'], \
                [g.ref_stock],period='1d',end_time=g.yesterday, count = g.N + g.M)[g.ref_stock]
    for i in range(g.M):
        model = sm.OLS(HS300.high[i:i+g.N], sm.add_constant(HS300.low[i:i+g.N]))
        result = model.fit()
        g.beta_l.append(result.params[1])
        g.R2_l.append(result.rsquared)
    g.beta_l = g.beta_l[:-1]
    g.R2_l = g.R2_l[:-1]

# 只看RSRS因子值作为买入、持有和清仓依据,前版本还加入了移动均线的上行作为条件
def get_rsrs_signal(ContextInfo):
    close_data = ContextInfo.get_market_data_ex(['close'], \
                    [g.ref_stock],period='1d', end_time=g.yesterday, count = 20+2)[g.ref_stock]
    HS300 = ContextInfo.get_market_data_ex(['high', 'low'], \
                    [g.ref_stock],period='1d', end_time=g.yesterday, count = g.N)[g.ref_stock]
    HS300 = HS300.dropna()
    model = sm.OLS(HS300.high, sm.add_constant(HS300.low))
    result = model.fit()
    beta = result.params[1]
    g.beta_l.append(beta)
    r2 = result.rsquared
    g.R2_l.append(r2)
    section = g.beta_l[-g.M:]
    # 计算均值序列
    mu = np.mean(section)
    # 计算标准化RSRS指标序列
    sigma = np.std(section)
    beta_norm = (section[-1]-mu)/sigma 
    # beta_right= beta_norm*beta*r2
    beta_right= beta_norm*r2
    if beta_right < -g.score_threshold:
        return "SELL"
    elif beta_right > g.score_threshold and \
        (np.mean(close_data['close'][-20:]) > np.mean(close_data['close'][-22:-2])): 
        return "BUY"
    else: 
        return "KEEP"

def trade(ContextInfo):
    realtime = ContextInfo.get_bar_timetag(ContextInfo.barpos)
    now = timetag_to_datetime(realtime,'%H%M%S')
    nowDate = timetag_to_datetime(realtime,'%Y%m%d %H:%M:%S')
    # 跳过非交易时间
    if now < '093000' or now > "150000":
        return
    account = get_trade_detail_data(g.acct, g.acct_type, 'account')
    holdings = get_trade_detail_data(g.acct, g.acct_type, 'position')
    g.holdings = {i.m_strInstrumentID + '.' + i.m_strExchangeID : i.m_nCanUseVolume for i in holdings}
    if len(account)==0:
        print(f'账号{g.acct} 未登录 请检查')
        return
    if '111000' >= now >= '105000':
        # get dates
        today = timetag_to_datetime(realtime,'%Y%m%d')
        yesterday = realtime - 24 * 3600 * 1000
        yesterday = timetag_to_datetime(yesterday,'%Y%m%d')
        start_date = realtime - 30 * 24 * 3600 * 1000
        start_date = timetag_to_datetime(start_date ,'%Y%m%d')
        g.today = today
        g.yesterday = yesterday
        g.start_date = start_date

        rsrs = get_rsrs_signal(ContextInfo)
        g.rsrs = rsrs
        print(nowDate, rsrs)
        if rsrs == 'SELL' and len(g.holdings) > 0:
            print(nowDate, 'RSRS SELL, 全部卖出')
            for s in g.holdings.keys():
                if not ContextInfo.is_suspended_stock(s):
                    passorder(g.sell_code, 1101, g.acct, s, 5, -1, g.holdings[s], 2, ContextInfo)

    if '141000' >= now >= '135000':
        if g.rsrs == 'SELL':
            return 
        g.stock_pool = ContextInfo.get_stock_list_in_sector('沪深A股')
        g.stock_list = prepare_stock_list(ContextInfo)
        stocks_to_sell = [s for s in g.holdings.keys() if s not in g.stock_list]
        for s in stocks_to_sell:
            msg = f"小市值 {s} 卖出 {g.holdings[s]/100} 手"
            print(nowDate, msg)
            if not ContextInfo.is_suspended_stock(s):
                passorder(g.sell_code, 1101, g.acct, s, 5, -1,g.holdings[s], 2, ContextInfo)
    if '144000' >= now >= '142000':
        if g.rsrs == 'SELL':
            return 
        # 获取可用资金
        print('可用资金', get_total_value(g.acct,'STOCK'))
        stocks_to_buy = []
        num_stocks_to_buy = g.buy_stock_count - len(g.holdings)
        for s in g.stock_list:
            if s not in g.holdings.keys():
                stocks_to_buy.append(s)
            if len(stocks_to_buy) >= num_stocks_to_buy:
                break
        g.stocks_to_buy = stocks_to_buy
        if len(g.stocks_to_buy) > 0:
            value = get_total_value(g.acct,'STOCK') / len(g.stocks_to_buy)
            for s in g.stocks_to_buy: # 立即以最新价格下单
                latest_price = ContextInfo.get_market_data_ex(['close'], \
                                            [s],period='30m', end_time=g.today, count = 1)[s]['close'][0]
                vol = value // (latest_price *100 * 1.01)  # * 1.01 防止钱不够
                msg = f"小市值 {s} 买入 {vol}手"
                print(nowDate, msg)
                passorder(g.buy_code, 1101, g.acct, s, 5, -1, vol*100,  2, ContextInfo)
        else:
            print(nowDate, '无需换仓')

def handlebar(ContextInfo):
    trade(ContextInfo)

def prepare_stock_list(ContextInfo):
    '''
    选股模块,根据因子选出预持有的股票
    '''
    stock_pool = filter_new_stock(ContextInfo, g.stock_pool)
    # 过滤科创板
    stock_pool = filter_kcb_stock(ContextInfo, stock_pool)
    stock_pool = filter_st_stock(ContextInfo,stock_pool)

    #净利润增长率前10%
    stock_pool = get_factor_filter_list(ContextInfo, stock_pool, 'PERSHAREINDEX',
                'du_profit_rate', asc=False, p=0.1)
    #PEG 前50%
    stock_pool = get_peg_filter_list(ContextInfo, stock_pool, p=0.5)

    # roe > 0.2% 
    stock_pool = get_factor_filter_list_ex(ContextInfo, stock_pool, 'PERSHAREINDEX',\
                'du_return_on_equity', t=0.2)

    # 净利润大于20000000
    stock_pool = get_factor_filter_list_ex(ContextInfo, stock_pool, 'ASHAREINCOME',\
                'net_profit_incl_min_int_inc', t=20000000)

    # 营业收入同比增长>0
    stock_pool = get_factor_filter_list_ex(ContextInfo, stock_pool, 'PERSHAREINDEX',\
                'inc_revenue_rate', t=0)

    # 获取小市值股票
    scores = {}
    price_data = ContextInfo.get_market_data_ex(['close'],stock_pool,period='1d',end_time=g.yesterday, count=1)
    fieldList = ['CAPITALSTRUCTURE.total_capital']
    share_data = ContextInfo.get_financial_data(fieldList, stock_pool, g.start_date, \
                    g.yesterday, report_type = 'report_time')
    for s in stock_pool:
        try:
            scores[s] = price_data[s]['close'][0] * share_data[s]['total_capital'][0]
        except:
            print('Can not get mkt info for stock {}, skip'.format(s))
    if len(scores) > 0: 
        df = pd.DataFrame.from_dict(scores, orient='index')
        df.columns = ['cap']
        df = df.sort_values(by=['cap'])
        stock_pool = df.index.values.tolist()[:30]
        # 过滤涨跌停, !! 这个过滤要放在最后,股票太多,调用get_market_data_ex 频率受限
        stock_pool = filter_limitup_stock(ContextInfo,stock_pool)
        stock_pool = filter_limitdown_stock(ContextInfo,stock_pool)
            # 过滤停牌
        stock_pool = filter_paused_stock(ContextInfo,stock_pool)
        return stock_pool[:g.buy_stock_count]
    else:
        return []

def get_total_value(account_id,datatype):#(账号,账户类型)
    '''
    获取账户当前可用现金
    '''
    result = 0
    result_list = get_trade_detail_data(account_id,datatype,'ACCOUNT')
    for obj in result_list:
        result = obj.m_dAvailable
    return result

def get_peg_filter_list(ContextInfo, stock_list, p= 0.5):
    score_list = []
    filter_stocks = []
    data = ContextInfo.get_financial_data(['PERSHAREINDEX.s_fa_eps_basic', 'PERSHAREINDEX.du_profit_rate'], stock_list, g.start_date,
                g.yesterday, report_type = 'report_time')
    price = ContextInfo.get_market_data_ex(['close'],stock_list,period='1d',end_time=g.yesterday, count = 1)
    for s in stock_list:
        try:
            score_list.append(price[s]['close'][0] * data[s]['du_profit_rate'][0] / data[s]['s_fa_eps_basic'][0])
            filter_stocks.append(s)
        except:
            print('Caculate PEG Error for stock {}, skip'.format(s))
    df = pd.DataFrame(columns=['code','score'])
    df['code'] = filter_stocks
    df['score'] = score_list
    df = df.dropna()
    df = df[df['score']>0]
    df.sort_values(by='score', ascending=True, inplace=True)
    filter_list = list(df.code)[0:int(p*len(stock_list))]
    return filter_list

def get_factor_filter_list(ContextInfo, stock_list, table, field, asc=True,p=0.3):
    score_list = []
    data = ContextInfo.get_financial_data([table + '.' + field], stock_list, g.start_date,
                g.yesterday, report_type = 'report_time')
    for s in stock_list:
        score_list.append(data[s][field][0])
    df = pd.DataFrame(columns=['code','score'])
    df['code'] = stock_list
    df['score'] = score_list
    df = df.dropna()
    df = df[df['score']>0]
    df.sort_values(by='score', ascending=asc, inplace=True)
    filter_list = list(df.code)[0:int(p*len(stock_list))]
    return filter_list

def get_factor_filter_list_ex(ContextInfo, stock_list, table, field, t=0):
    score_list = []
    data = ContextInfo.get_financial_data([table + '.' + field], stock_list, g.start_date,
                g.yesterday, report_type = 'report_time')
    for s in stock_list:
        score_list.append(data[s][field][0])
    df = pd.DataFrame(columns=['code','score'])
    df['code'] = stock_list
    df['score'] = score_list
    df = df.dropna()
    df = df[df['score']>t]
    filter_list = list(df.code)
    return filter_list

#2-1 过滤停牌股票
def filter_paused_stock(ContextInfo,stock_list):
    return [stock for stock in stock_list if not ContextInfo.is_suspended_stock(stock, 1)]

def filter_new_stock(ContextInfo, stock_list):
    realtime = ContextInfo.get_bar_timetag(ContextInfo.barpos)
    startDate = realtime - g.ipo_days * 24 * 3600 * 1000
    startDate = int(timetag_to_datetime(startDate,'%Y%m%d'))
    return [stock for stock in stock_list if ContextInfo.get_instrumentdetail(stock)['OpenDate'] < startDate]

#2-2 过滤ST及其他具有退市标签的股票
def filter_st_stock(ContextInfo,stock_list):
    return [stock for stock in stock_list
        if 'ST' not in ContextInfo.get_stock_name(stock)
        and '*' not in ContextInfo.get_stock_name(stock)
        and '退' not in ContextInfo.get_stock_name(stock)
        and ContextInfo.get_instrumentdetail(stock)['InstrumentID'] is not None]

# 过滤掉科创板
def filter_kcb_stock(ContextInfo, stock_list):
    return [stock for stock in stock_list if not stock.startswith('688')]

# 过滤涨停的股票
def filter_limitup_stock(ContextInfo, stock_list):
    # 已存在于持仓的股票即使涨停也不过滤,避免此股票再次可买,但因被过滤而导致选择别的股票
    filter_list = []
    price_data = ContextInfo.get_market_data_ex(['close'],stock_list,period='30m', end_time=g.today, count=1)
    limitup_price = ContextInfo.get_market_data_ex(['close'],stock_list,period='1d', end_time=g.yesterday, count=1)
    for stock in stock_list:
        if stock in g.holdings.keys():
            filter_list.append(stock)
            continue
        if price_data[stock]['close'][0] < limitup_price[stock]['close'][0] * 1.1:
            filter_list.append(stock)
    return filter_list

# 过滤跌停股票
def filter_limitdown_stock(ContextInfo,stock_list):
    filter_list = []
    price_data = ContextInfo.get_market_data_ex(['close'],stock_list,period='30m', end_time=g.today, count=1)
    limitdown_price = ContextInfo.get_market_data_ex(['close'],stock_list,period='1d', end_time=g.yesterday, count=1)
    for stock in stock_list:
        if price_data[stock]['close'][0] > limitdown_price[stock]['close'][0] * 0.9:
            filter_list.append(stock)
    return filter_list