Pandas多处理应用程序内存不足

xxhby3vn  于 2022-12-09  发布在  其他
关注(0)|答案(1)|浏览(142)

我有一个 Dataframe ,其中一列作为列表,另一列作为字典。但是,这是不一致的。它可能是单个元素或NULL

df = pd.DataFrame({'item_id':[1,1,1,2,3,4,4],
'shop_id':['S1','S2','S3','S2','S3','S1','S2'], 
'price_list':["{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}",'50','NaN',"{'10':['S1','S2','S3'],'25':['S4']}","{'10':['S1','S2','S3'],'25':['S4']}"]})

+---------+---------+--------------------------------------------------+
| item_id | shop_id |                      price_list                  |
+---------+---------+--------------------------------------------------+
|       1 | S1      | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
|       1 | S2      | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
|       1 | S3      | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
|       2 | S2      | 50                                               |
|       3 | S3      | NaN                                              |
|       4 | S1      | {'10': ['S1', 'S2', 'S3'], '25': ['S4']}         |
|       4 | S2      | {'10': ['S1', 'S2', 'S3'], '25': ['S4']}         |
+---------+---------+--------------------------------------------------+

我想把它扩大为:

+---------+---------+-------+
| item_id | shop_id | price |
+---------+---------+-------+
|       1 | S1      | 10    |
|       1 | S2      | 10    |
|       1 | S3      | 20    |
|       2 | S2      | 50    |
|       3 | S3      | NaN   |
|       4 | S1      | 10    |
|       4 | S2      | 10    |
+---------+---------+-------+

我试着用敷:

def get_price(row):
    if row['price_list'][0]=='{':
        prices = eval(row['price_list'])
        for key,value in prices.items():
            if str(row['shop_id']) in value:
                price = key
                break
            price =  np.nan
    else:
        price =  row["price_list"]
    return price

df['price'] = df.apply(lambda row: get_price(row),axis=1)

(The price_list列中的dictionary元素实际上是字符串,因此我需要首先将它们作为dict进行计算。)
但是,由于我的 Dataframe 非常大,上述方法需要花费大量时间。
因此,我尝试使用多重处理。我使用多重处理的方法如下:

def get_price(row):
    if row['price_list'][0]=='{':
        prices = eval(row['price_list'])
        for key,value in prices.items():
            if str(row['shop_id']) in value:
                price = key
                break
            price =  np.nan
    else:
        price =  row["price_list"]
    return price

def parallelize(data, get_price, num_of_processes):
    data_split = np.array_split(data, num_of_processes)
    pool = mp.Pool(num_of_processes)
    data = pd.concat(pool.map(get_price, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(get_price, data_subset):
    data_subset['price'] = data_subset.apply(get_price, axis=1)
    return data_subset

def parallelize_on_rows(data, get_price, num_of_processes):
    return parallelize(data, partial(run_on_subset, get_price), num_of_processes)

df = parallelize_on_rows(df,get_price,num_processes)

现在,虽然我曾经在单核上运行apply,但它会一直运行下去。但在使用所有4个内核的多处理后,我会出现内存错误,内核死亡。
我有16GB的内存和4个内核。当脚本启动时,我已经使用了8GB。我运行的是64位Python 3.6。
我正在运行linux,只是使用多处理池。
如何在不使用更少内核的情况下,始终使用多处理来运行脚本?

mf98qq94

mf98qq941#

代替将输入拆分为num_of_processes,考虑将其拆分为该数目的10-20倍,这样一次只有5-10%的数据被复制到另一进程,还考虑固定mapchunksize参数,因为它采用默认取决于输入数目的值,该值可能不是1,因此可能会向其他进程发送比预期更多的数据。

def parallelize(data, get_price, num_of_processes):
    data_split = np.array_split(data, num_of_processes*20)
    pool = mp.Pool(num_of_processes)
    data = pd.concat(pool.map(get_price, data_split, chunksize=1))
    pool.close()
    pool.join()
    return data

相关问题