永久性投资组合之一(桥水全天候策略)


并不是所有人为了追求20%以上的年化,都能够容忍超过50%以上的回撤(况且要在多年能够实现20%以上的年化也是非常困难的一件事情)。我们说风险和收益成正比,是否存在年化可能10%左右,但是回撤极小的策略呢? 桥水的全天候投资策略就是这样一种投资组合。当然,桥水内部的版本可能要复杂的多,可能也没人真正知道。一个简化的版本可以只包含如下几种投资标的:

  1. 股票(或者指数ETF)
  2. 大宗商品(如黄金)
  3. 长期国债
  4. 其他市场的指数(如纳斯达克指数)

我们看到这些个品种的风险是相对分散的,策略的关键在于确定每一种投资标的在组合中的占比。这里涉及到对风险的衡量。有很多种不同的衡量风险的标准,如波动率,方差,VaR(Value at Risk, 在险价值),C-VaR,ES等等。如果希望我们的资产组合有较少的波动,我们自然不应该给予那些风险很大的品种过高的投资占比。这里我们以ES作为风险衡量的指标,ES是Expected Shortfall 的缩写。VaR衡量的是在一个给定的置信区间,我们的损失不超过某一值的概率。ES度量的是损失在VaR水平之上的平均损失值。有兴趣的朋友可以自行查找关于风险度量的相关文献。我这边需要提醒的就是,大部分的风险度量方法,都做了如下两个假设:

  1. 价格随机游走
  2. 收益率符合正态分布

很显然这些都不是事实,所以读者也不要盲信这些风险度量指标。

我们现在来看一下我们策略的效果。我对原策略稍微做了些修改,使得读者对投资品种的占比可以设置一个先验的权重。


可以看到在过去的10年当中,虽然我们的策略年化只有5.43%, 但最大回撤只有3.86%,不到5%。 我们可以通过修改我们的先验权重,在最大回撤和收益率之间进行权衡。如果我们调高股票类资产的占比,则可以提高年化收益率,但最大回测也会相应上升,如下图所示:


这里面还有个特别关键的点,在较小的回测情况下,我们还可以合理的使用杠杆(如果你的资金成本很低的话,比如资金成本每年3%以下)来增加我们的收益(肯定有人不会满足于5点多的年化)。当年长期资本管理公司(LTCM)对债券进行套利,每一笔交易其实赚的少的可怜,就是因为其使用资金的成本极低,从而可以通过高杠杆,成倍的放大自己的收益。不过话说回来也是因为高杠杆导致了LTCM的覆灭。

最后附上代码供参考:

# Modified from https://www.joinquant.com/view/community/detail/f22b10df1b2451d981ee8838e2063314

import datetime as dt
import numpy as np
import math
from jqdata import *


def initialize(context):
    set_benchmark('511010.XSHG') # 国债ETF    2013-03-05
    set_option('use_real_price', True)
    # 关闭部分log
    log.set_level('order', 'error')
    set_slippage(FixedSlippage(0.002))
    # 交易记录,
    g.transactionRecord, g.trade_ratio, g.positions = {}, {}, {}
    g.hold_periods, g.hold_cycle = 0, 30
    g.weights = np.array([0.3, 0.2, 0.2, 0.3])
    #g.weights = np.array([0.25, 0.25, 0.25, 0.25])

    g.QuantLib = QuantLib()

    # 开盘前运行
    run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
    # 开盘时运行
    run_daily(market_open, time='open', reference_security='000300.XSHG')
    # 收盘后运行
    run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')


# initialize parameters
def fun_initialize(context):
    """
    因为模拟交易时,会保留参数历史赋值,重新赋值需改名。
    为了避免参数变更后在模拟交易里不生效,单独赋值一次,
    需保留状态的参数,不能放此函数内
    """
    g.equity = ['510300.XSHG']  # 300ETF 
    g.commodities = ['518880.XSHG'] # 黄金ETF
    g.bonds = ['511010.XSHG']  # 国债ETF
    g.money_fund = ['513100.XSHG'] # 纳指ETF

    #g.confidence_level = 2.58
    g.confidence_level = 1.96

    g.pools = g.equity + g.commodities + g.bonds + g.money_fund

    # 统计交易资料
    for stock in g.pools:
        if stock not in g.transactionRecord:
            g.QuantLib.fun_createTransactionRecord(context, stock)


def before_market_open(context):
    # 初始化参数
    fun_initialize(context)

    # 此段代码仅用于发微信,可以跳过
    g.message = ""
    g.message += "Returns(盘前):" + str(round(context.portfolio.returns, 5) * 100) + "%\n"
    g.hold = {}
    for stock in g.pools:
        if stock in context.portfolio.positions:
            g.hold[stock] = context.portfolio.positions[stock].total_amount
        else:
            g.hold[stock] = 0


def after_market_close(context):
    # 此段代码仅用于发微信,可以跳过
    message = ""
    for stock in g.pools:
        beforeAmount = g.hold[stock]
        if stock in context.portfolio.positions:
            afterAmount = context.portfolio.positions[stock].total_amount
        else:
            afterAmount = 0
        #
        if beforeAmount == afterAmount:
            message += stock + " : " + str(afterAmount) + "\n"
        elif beforeAmount < afterAmount:
            message += stock + " : " + str(afterAmount) + "(+" + str(afterAmount - beforeAmount) + ")\n"
        else:
            message += stock + " : " + str(afterAmount) + "(" + str(afterAmount - beforeAmount) + ")\n"

    message += "Returns(盘后):" + str(round(context.portfolio.returns, 5) * 100) + "%"
    g.message += message

    g.message += g.QuantLib.fun_print_transactionRecord(context)
    # send_message(g.message)
    log.info(g.message)


def market_open(context):
    g.tradeRecord = ""

    if g.hold_periods == 0 or need_rebalance(context):
        rebalance(context)
        g.hold_periods = g.hold_cycle
    else:
        g.tradeRecord = ""
        g.hold_periods -= 1

    #
    fun_trade(context, g.trade_ratio)

    if g.tradeRecord != "":  # 如果打印记录不为空,则发微信
        message = "\n 今日调仓 \n"
        message += g.tradeRecord
        # send_message(message)
        log.info(message)


def rebalance(context):
    # type: (Context) -> NoReturn
    trade_ratio = fun_calc_trade_ratio(context)

    g.trade_ratio = trade_ratio

    for stock in trade_ratio:
        if stock in context.portfolio.positions:
            g.positions[stock] = context.portfolio.positions[stock].price
        else:
            g.positions[stock] = 0.0


def fun_calc_trade_ratio(context):

    def __fun_getdailyreturn(stock, freq, lag):
        # type: (str, str, int) -> np.ndarray
        hStocks = history(lag, freq, 'close', stock, df=True)
        # daily_returns = hStocks.resample('D', how='last').pct_change().fillna(value=0, method=None, axis=0).values
        daily_returns = hStocks.resample('D').last().pct_change().fillna(value=0, method=None, axis=0).iloc[:, 0].values
        return daily_returns

    def __fun_get_portfolio_ES(ratio, freq, lag, confidence_level):
        # type: (dict, str, int, float) -> float
        if confidence_level == 1.96:
            a = (1 - 0.95)
        elif confidence_level == 2.06:
            a = (1 - 0.96)
        elif confidence_level == 2.18:
            a = (1 - 0.97)
        elif confidence_level == 2.34:
            a = (1 - 0.98)
        elif confidence_level == 2.58:
            a = (1 - 0.99)
        elif confidence_level == 5:
            a = (1 - 0.99999)
        else:
            a = (1 - 0.95)

        ES = 0
        if ratio:
            daily_returns = __fun_getdailyreturn(list(ratio.keys())[0], freq, lag)
            dailyReturns_sort = sorted(daily_returns)

            count = 0
            sum_value = 0
            for i in range(len(dailyReturns_sort)):
                if i < (lag * a):
                    sum_value += dailyReturns_sort[i]
                    count += 1
            if count == 0:
                ES = 0
            else:
                ES = -(sum_value / (lag * a))

        return ES

    # def ES_daily(a,x):
    #     VaR=np.percentile(a,(1-x)*100)
    #     ES=a[a<=VaR].mean()
    #     return abs(ES)

    # def __fun_calc_stock_risk_VaR(stock_list):
    #     __portfolio_VaR = 0
    #     # __stock_ratio = g.QuantLib.fun_calc_stockWeight(stock_list)
    #     __stock_ratio = {}
    #     if stock_list:
    #         __stock_ratio[stock_list[0]] = 1
    #         daily_returns = __fun_getdailyreturn(stock_list[0], '1d', 120)
    #         __portfolio_VaR = 1 * g.confidence_level * np.std(daily_returns)
    # 
    #         if math.isnan(__portfolio_VaR):
    #             __portfolio_VaR = 0
    # 
    #     return __portfolio_VaR, __stock_ratio

    def __fun_calc_stock_risk_ES(stock_list):
        # type: (list) -> (float, dict)
        __stock_ratio = {}
        if stock_list:
            __stock_ratio[stock_list[0]] = 1

        __portfolio_ES = __fun_get_portfolio_ES(__stock_ratio, '1d', 120, g.confidence_level)
        if math.isnan(__portfolio_ES):
            __portfolio_ES = 0

        return __portfolio_ES, __stock_ratio

    def __fun_calc_trade_ratio(trade_ratio, stock_list, __equity_ratio, position, all_position):
        # type: (dict, list, dict, float, float) -> dict
        for stock in stock_list:
            if stock in trade_ratio:
                trade_ratio[stock] += round((__equity_ratio[stock] * position / all_position), 3)
            else:
                trade_ratio[stock] = round((__equity_ratio[stock] * position / all_position), 3)
        return trade_ratio

    equity_ES, equity_ratio = __fun_calc_stock_risk_ES(g.equity)
    commodities_ES, commodities_ratio = __fun_calc_stock_risk_ES(g.commodities)
    bonds_ES, bonds_ratio = __fun_calc_stock_risk_ES(g.bonds)
    money_fund_ES, money_fund_ratio = __fun_calc_stock_risk_ES(g.money_fund)

    max_ES = max(equity_ES, commodities_ES, bonds_ES, money_fund_ES)

    equity_position, commodities_position, bonds_position, money_fund_position = 0, 0, 0, 0
    if equity_ES:
        equity_position = max_ES / equity_ES
    if commodities_ES:
        commodities_position = max_ES / commodities_ES
    if bonds_ES:
        bonds_position = max_ES / bonds_ES
    if money_fund_ES:
        money_fund_position = max_ES / money_fund_ES

    total_position = equity_position + commodities_position + bonds_position + money_fund_position

    __ratio = {}

    __ratio = __fun_calc_trade_ratio(__ratio, g.equity, equity_ratio, equity_position, total_position)
    __ratio = __fun_calc_trade_ratio(__ratio, g.commodities, commodities_ratio, commodities_position, total_position)
    __ratio = __fun_calc_trade_ratio(__ratio, g.bonds, bonds_ratio, bonds_position, total_position)
    __ratio = __fun_calc_trade_ratio(__ratio, g.money_fund, money_fund_ratio, money_fund_position, total_position)

    log.info('原仓位:%s' % __ratio)
    tmp_ratio = np.array(list(__ratio.values()))
    tmp_ratio = g.weights * tmp_ratio / np.dot(g.weights, tmp_ratio)
    new_ratio = {k: tmp_ratio[i] for i, k in enumerate(__ratio)}
    log.info('调整后仓位:%s' % new_ratio)
    return new_ratio


def fun_trade(context, buyDict):
    # type: (Context, dict) -> NoReturn
    def __fun_tradeStock(_context, _stock, ratio):
        # type: (Context, str, float) -> NoReturn
        total_value = _context.portfolio.total_value
        if _stock in g.money_fund:
            g.QuantLib.fun_tradeBond(_context, _stock, total_value * ratio)
        else:
            curPrice = history(1, '1d', 'close', _stock, df=False)[_stock][-1]
            if _stock in context.portfolio.positions:
                curValue = _context.portfolio.positions[_stock].total_amount * curPrice
            else:
                curValue = 0.0
            #
            Quota = total_value * ratio
            deltaValue = abs(Quota - curValue)
            if deltaValue / Quota >= 0.25 and deltaValue > 1000:
                if Quota > curValue:
                    avg_cost = g.transactionRecord[_stock]['avg_cost']
                    if curPrice > avg_cost:  # 如果亏损,不加仓
                        cash = _context.portfolio.available_cash
                        if cash >= Quota * 0.25:
                            g.QuantLib.fun_trade(_context, _stock, Quota)
                else:
                    g.QuantLib.fun_trade(_context, _stock, Quota)

    buy_list = list(buyDict.keys())

    my_hold_stock = list(context.portfolio.positions.keys())
    portfolioValue = context.portfolio.total_value

    # 已有仓位
    holdDict = dict()
    if my_hold_stock:
        h_hold_stocks = history(1, '1d', 'close', my_hold_stock, df=False)
        for stock in my_hold_stock:
            tmpW = round((context.portfolio.positions[stock].total_amount * h_hold_stocks[stock][0]) / portfolioValue, 2)
            holdDict[stock] = float(tmpW)

    # 对已有仓位做排序
    tmpDict = {}
    for stock in holdDict:
        if stock in buyDict:
            tmpDict[stock] = round((buyDict[stock] - holdDict[stock]), 2)
    tradeOrder = sorted(tmpDict.items(), key=lambda d: d[1], reverse=False)

    # 先卖掉持仓减少的标的
    _tmplist = []
    for idx in tradeOrder:
        stock = idx[0]
        __fun_tradeStock(context, stock, buyDict[stock])
        _tmplist.append(stock)

    # 交易其他股票
    for i in range(len(buy_list)):
        stock = buy_list[i]
        if len(_tmplist) != 0:
            if stock not in _tmplist:
                __fun_tradeStock(context, stock, buyDict[stock])
        else:
            __fun_tradeStock(context, stock, buyDict[stock])


def need_rebalance(context):
    # type: (Context) -> bool
    """
    持仓中,有资产价格变化幅度超过15%,就需要重新balance
    """
    for stock in context.portfolio.positions:
        curPrice = context.portfolio.positions[stock].price
        oldPrice = g.positions[stock]
        if oldPrice != 0:
            if abs(curPrice - oldPrice) / oldPrice > 0.15:
                return True


class QuantLib(object):

    def __init__(self, _period='1d'):
        """
        周期 period  (支持’Xd’,’Xm’, X是一个正整数)
        """
        # self.period = _period
        # self.context = None
        # self.data = None
        pass

    def fun_tradeBond(self, context, stock, Value):
        # type: (Context, str, float) -> NoReturn
        curPrice = history(1, '1d', 'close', stock, df=False)[stock][-1]
        if stock in context.portfolio.positions:
            curValue = float(context.portfolio.positions[stock].total_amount * curPrice)
        else:
            curValue = 0.0
        #
        deltaValue = abs(Value - curValue)
        if deltaValue > (curPrice * 100):
            if Value > curValue:
                cash = context.portfolio.available_cash
                if cash > (curPrice * 100):
                    self.fun_trade(context, stock, Value)
            else:
                # 如果是银华日利,多卖 100 股,避免个股买少了
                if stock == '511880.XSHG':
                    Value -= curPrice * 100
                self.fun_trade(context, stock, Value)

    # 剔除上市时间较短的产品
    def fun_delNewShare(self, context, equity, delta_day):
        # type: (Context, list, int) -> list
        deltaDate = context.current_dt.date() - dt.timedelta(delta_day)

        tmpList = []
        for stock in equity:
            if get_security_info(stock).start_date < deltaDate:
                tmpList.append(stock)

        return tmpList

    def fun_trade(self, context, stock, value):
        # type: (Context, str, float) -> NoReturn
        g.tradeRecord += stock + " 调仓到 " + str(round(value, 2)) + "\n"
        # self.fun_setCommission(context, stock)
        order_target_value(stock, value)
        self.fun_record(context, stock)

    def fun_record(self, context, stock):
        # type: (Context, str) -> NoReturn
        tmpDict = g.transactionRecord.copy()
        # myPrice = history(1, '1d', 'close', stock, df=False)[stock]
        myPrice = context.portfolio.positions[stock].price
        newAmount = context.portfolio.positions[stock].total_amount
        #
        myAmount = tmpDict[stock]['amount']
        myAvg_cost = tmpDict[stock]['avg_cost']
        if myAmount != newAmount:
            # 买入
            if myAmount <= newAmount:
                myAvg_cost = ((myAvg_cost * myAmount) + myPrice * (newAmount - myAmount)) / newAmount
                # g.positions[stock] = context.portfolio.positions[stock].price
                tmpDict[stock]['buy_times'] += 1
            # 卖光
            elif newAmount == 0:
                if myPrice >= myAvg_cost:
                    tmpDict[stock]['win'] += 1
                else:
                    tmpDict[stock]['loss'] += 1
                myMargin = (myPrice - myAvg_cost) * myAmount
                if myMargin < 0:
                    if myMargin <= tmpDict[stock]['max_loss']:
                        tmpDict[stock]['max_loss'] = float(round(myMargin, 2))
                        tmpDict[stock]['max_loss_date'] = context.current_dt

                tmpDict[stock]['Margin'] += float(round(myMargin, 2))
                tmpDict[stock]['sell_times'] += 1
            # 没卖光
            elif myAmount > newAmount:
                myAvg_cost = ((myAvg_cost * myAmount) - (myPrice * (myAmount - newAmount))) / newAmount
                # g.positions[stock] = context.portfolio.positions[stock].price
                tmpDict[stock]['sell_times'] += 1

        g.tradeRecord += stock + " 持股从 " + str(myAmount) + " 变为 " + str(newAmount) + \
                         " 占比 " + str(
            100 * round((myPrice * newAmount) / context.portfolio.total_value, 2)) + "%\n"

        # renew after trade
        if newAmount == 0:
            myAvg_cost = 0
            tmpDict[stock]['standPrice'] = 0
        elif myAvg_cost > tmpDict[stock]['standPrice']:
            tmpDict[stock]['standPrice'] = float(myAvg_cost)

        myAmount = newAmount
        tmpDict[stock]['amount'] = float(myAmount)
        tmpDict[stock]['avg_cost'] = float(myAvg_cost)
        g.transactionRecord = tmpDict.copy()

    def fun_createTransactionRecord(self, context, stock):
        # type: (Context, str) -> NoReturn
        g.transactionRecord[stock] = {'amount': 0, 'avg_cost': 0, 'buy_times': 0,
                                      'sell_times': 0, 'win': 0, 'loss': 0, 'max_loss': 0, 'max_loss_date': 0,
                                      'Margin': 0,
                                      'standPrice': 0}

    def fun_print_transactionRecord(self, context):
        # type: (Context) -> str
        tmpDict = g.transactionRecord.copy()
        tmpList = list(tmpDict.keys())
        message = "\n" + "stock, Win, loss, buy_times, sell_times, Margin, max_loss, max_loss_date, avg_cost\n"
        for stock in tmpList:
            message += stock + ", "
            message += str(tmpDict[stock]['win']) + ", " + str(tmpDict[stock]['loss']) + " , "
            message += str(tmpDict[stock]['buy_times']) + ", " + str(tmpDict[stock]['sell_times']) + ", "
            message += str(tmpDict[stock]['Margin']) + ", "
            message += str(tmpDict[stock]['max_loss']) + ", " + str(tmpDict[stock]['max_loss_date']) + ", "
            message += str(tmpDict[stock]['avg_cost']) + "\n"
        message += "Returns = " + str(round(context.portfolio.returns, 5) * 100) + "%\n"

        g.transactionRecord = tmpDict.copy()

        return message