将python函数转换为在apachespark中运行的最佳实践是什么

drkbr07n  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(292)

我有一个python程序来处理一台计算机中的大数据(16个cpu核)。因为数据越来越大,我需要它在5台电脑上运行。我是个新手,看完一些文件后仍然感到困惑。如果有人能告诉我什么是制作小型集群的最佳方法,我将不胜感激。
以下是一些细节:
该程序试图从Dataframe数据中计算每种股票(一天一次)的每一个价格的交易量。
有3000多只股票,一天内成交10亿。数据文件(dataframe)的大小在1~2g之间。
现在一台电脑花了300天,花了3天,我希望再增加4台电脑,以缩短时间。
下面是python中的示例代码:

import sharedmem
import os
import multiprocessing as mp

def ticks_to_priceline(day=None):

    # file name for the tick dataframe file, one day for a file
    fn = get_tick_dataframe_filename_byday(day)
    with pd.HDFStore(fn, 'r') as tick_store:
        tick_dataframe = tick_store.select("tick")
        all_stock_symbols = tick_dataframe.symbol.drop_duplicates()

        sblist = []
        # cut to small chunk
        chunk = 300
        for xx in range(len(all_stock_symbols) / chunk + 1):
            sblist.append(all_stock_symbols[xx * chunk:(xx + 1) * stuck])
        # run with all cpus
        with sharedmem.MapReduce(np=mp.cpu_count()) as pool:

            def work(chunk_list):
                result = {}
                for symbol in chunk_list:
                    data = tick_dataframe[tick_dataframe.symbol == symbol]
                    if not data.empty and len(data) > 99:
                        df1 = data.loc[:,
                                       [u'timestamp', u'price', u'volume']]
                        df1['vol_diff'] = df1.volume.diff().fillna(0)
                        df2 = df1.loc[:, ['price', 'vol_diff']]
                        df2.price = df2.price.apply(int)
                        rs = df2.groupby('price').sum()
                        rs = rs.sort_index(ascending=0).reset_index()

                        result[symbol] = rs
                return result

            rslist = pool.map(work, sblist)
    return rslist

这是一个独立模式下的spark集群,我已经为测试设置好了。我的主要问题是如何重写上面的代码。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题