我有一个python程序来处理一台计算机中的大数据(16个cpu核)。因为数据越来越大,我需要它在5台电脑上运行。我是个新手,看完一些文件后仍然感到困惑。如果有人能告诉我什么是制作小型集群的最佳方法,我将不胜感激。
以下是一些细节:
该程序试图从Dataframe数据中计算每种股票(一天一次)的每一个价格的交易量。
有3000多只股票,一天内成交10亿。数据文件(dataframe)的大小在1~2g之间。
现在一台电脑花了300天,花了3天,我希望再增加4台电脑,以缩短时间。
下面是python中的示例代码:
import sharedmem
import os
import multiprocessing as mp
def ticks_to_priceline(day=None):
# file name for the tick dataframe file, one day for a file
fn = get_tick_dataframe_filename_byday(day)
with pd.HDFStore(fn, 'r') as tick_store:
tick_dataframe = tick_store.select("tick")
all_stock_symbols = tick_dataframe.symbol.drop_duplicates()
sblist = []
# cut to small chunk
chunk = 300
for xx in range(len(all_stock_symbols) / chunk + 1):
sblist.append(all_stock_symbols[xx * chunk:(xx + 1) * stuck])
# run with all cpus
with sharedmem.MapReduce(np=mp.cpu_count()) as pool:
def work(chunk_list):
result = {}
for symbol in chunk_list:
data = tick_dataframe[tick_dataframe.symbol == symbol]
if not data.empty and len(data) > 99:
df1 = data.loc[:,
[u'timestamp', u'price', u'volume']]
df1['vol_diff'] = df1.volume.diff().fillna(0)
df2 = df1.loc[:, ['price', 'vol_diff']]
df2.price = df2.price.apply(int)
rs = df2.groupby('price').sum()
rs = rs.sort_index(ascending=0).reset_index()
result[symbol] = rs
return result
rslist = pool.map(work, sblist)
return rslist
这是一个独立模式下的spark集群,我已经为测试设置好了。我的主要问题是如何重写上面的代码。
暂无答案!
目前还没有任何答案,快来回答吧!