使用numpy改进worldquant 101 alpha因子的实现

llycmphe  于 2022-11-29  发布在  其他
关注(0)|答案(1)|浏览(306)

我试图实现WorldQuant(https://arxiv.org/pdf/1601.00991.pdf)发布的101个量化交易因子。
一个典型的因子是关于处理股票的价格和数量信息以及时间维度和股票维度。以alpha因子#4为例:(-1 * Ts_Rank(rank(low),9))。这是一个动量alpha信号。low是股票在一定时间段内的最低价格的面板。rank是对面板的每一行进行排名的横截面过程(时间快照)。Ts_Rank是将_rank面板的每一列(股票)移动到指定窗口的时间序列过程。
直观地说,Pandas Dataframe NumPy矩阵应该适合101个alpha因子的实现。下面是我目前为止使用NumPy得到的最佳实现。但是,性能太低了。在我的Intel core i7 windows机器上,用5000(交易日期)x 200(股票)矩阵作为输入运行alpha #4因子大约需要45秒。
我还遇到了DolphinDB,这是一个具有内置分析功能的时间序列数据库(* https://www.dolphindb.com/downloads.html *)。对于相同的Alpha#4因子,DolphinDB仅运行了0.04秒,比NumPy版本快1000倍。但是,DolphinDB是一个商业软件,有人知道更好的Python实现吗?或者有什么技巧可以改进我当前的python代码,以获得与DolphinDB相当的性能?

import numpy as np

def rankdata(a, method='average', *, axis=None):
    # this rankdata refer to scipy.stats.rankdata (https://github.com/scipy/scipy/blob/v1.9.1/scipy/stats/_stats_py.py#L9047-L9153)
    if method not in ('average', 'min', 'max', 'dense', 'ordinal'):
        raise ValueError('unknown method "{0}"'.format(method))

    if axis is not None:
        a = np.asarray(a)
        if a.size == 0:
            np.core.multiarray.normalize_axis_index(axis, a.ndim)
            dt = np.float64 if method == 'average' else np.int_
            return np.empty(a.shape, dtype=dt)
        return np.apply_along_axis(rankdata, axis, a, method)

    arr = np.ravel(np.asarray(a))
    algo = 'mergesort' if method == 'ordinal' else 'quicksort'
    sorter = np.argsort(arr, kind=algo)

    inv = np.empty(sorter.size, dtype=np.intp)
    inv[sorter] = np.arange(sorter.size, dtype=np.intp)

    if method == 'ordinal':
        return inv + 1

    arr = arr[sorter]
    obs = np.r_[True, arr[1:] != arr[:-1]]
    dense = obs.cumsum()[inv]

    if method == 'dense':
        return dense

    # cumulative counts of each unique value
    count = np.r_[np.nonzero(obs)[0], len(obs)]

    if method == 'max':
        return count[dense]

    if method == 'min':
        return count[dense - 1] + 1

    # average method
    return .5 * (count[dense] + count[dense - 1] + 1)

def rank(x):
    return rankdata(x,method='min',axis=1)/np.size(x, 1)

def rolling_rank(na):
    return rankdata(na.transpose(),method='min',axis=0)[-1].transpose()    

def ts_rank(x, window=10):
    a_rolled = np.lib.stride_tricks.sliding_window_view(x, window,axis = 0)
    return np.append(np.full([window-1,np.size(x, 1)],np.nan),rolling_rank(a_rolled),axis = 0)

def alpha004(data):
    return -1 * ts_rank(rank(data), 9)

import time

# The input is a 5000 by 200 matrix, where the row index represents trade date and the column index represents security ID. 
data=np.random.random((5000, 200))
start_time = time.time()
alpha004(data)
print("--- %s seconds ---" % (time.time() - start_time))

--- 44.85099506378174 seconds ---
    • 海豚数据库实现**
def WQAlpha4(low){
    return -mrank(rowRank(low, percent=true), true, 9)
}

// The input is a 5000 by 200 matrix, where the row index represents trade date and the column index represents security ID.
low = rand(1000.0,5000:200);
timer WQAlpha4(low);

Time elapsed: 44.036 ms (0.044s)
vuktfyat

vuktfyat1#

这部分代码:

return np.apply_along_axis(rankdata, axis, a, method)

......这将是相当慢的。像这样的函数应用程序意味着更多的计算在Python中运行,而相对较少的计算在C中运行。
如果你愿意稍微改变一下秩函数的定义,这里有一个更快的解决方案。具体来说,下面的代码相当于从method='min'改为method='ordinal'。在一个随机数测试数据集上,它95%的情况下与你的方法一致,只有1个不同的地方不一致。
通过沿着轴方向使用argsort,numpy可以完成整个计算,而无需使用Python。

def rank(x):
    return (data.argsort(axis=1).argsort(axis=1) + 1) / np.size(x, 1)

def ts_rank(x, window=10):
    a_rolled = np.lib.stride_tricks.sliding_window_view(x, window, axis = 0)
    rolling_rank_fast = (a_rolled.argsort(axis=2).argsort(axis=2) + 1)[:, :, -1]
    # Fill initial window - 1 rows with nan
    initial_window = np.full([window-1,np.size(x, 1)],np.nan)
    return np.append(initial_window,rolling_rank_fast,axis = 0)

def alpha004(data):
    return -1 * ts_rank(rank(data), 9)

以这个为基准,我发现它的运行速度大约快了100倍。

相关问题