我有一个 Dataframe ,其中一列作为列表,另一列作为字典。但是,这是不一致的。它可能是单个元素或NULL
df = pd.DataFrame({'item_id':[1,1,1,2,3,4,4],
'shop_id':['S1','S2','S3','S2','S3','S1','S2'],
'price_list':["{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}","{'10':['S1','S2'], '20':['S3'], '30':['S4']}",'50','NaN',"{'10':['S1','S2','S3'],'25':['S4']}","{'10':['S1','S2','S3'],'25':['S4']}"]})
+---------+---------+--------------------------------------------------+
| item_id | shop_id | price_list |
+---------+---------+--------------------------------------------------+
| 1 | S1 | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
| 1 | S2 | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
| 1 | S3 | {'10': ['S1', 'S2'], '20': ['S3'], '30': ['S4']} |
| 2 | S2 | 50 |
| 3 | S3 | NaN |
| 4 | S1 | {'10': ['S1', 'S2', 'S3'], '25': ['S4']} |
| 4 | S2 | {'10': ['S1', 'S2', 'S3'], '25': ['S4']} |
+---------+---------+--------------------------------------------------+
我想把它扩大为:
+---------+---------+-------+
| item_id | shop_id | price |
+---------+---------+-------+
| 1 | S1 | 10 |
| 1 | S2 | 10 |
| 1 | S3 | 20 |
| 2 | S2 | 50 |
| 3 | S3 | NaN |
| 4 | S1 | 10 |
| 4 | S2 | 10 |
+---------+---------+-------+
我试着用敷:
def get_price(row):
if row['price_list'][0]=='{':
prices = eval(row['price_list'])
for key,value in prices.items():
if str(row['shop_id']) in value:
price = key
break
price = np.nan
else:
price = row["price_list"]
return price
df['price'] = df.apply(lambda row: get_price(row),axis=1)
(The price_list列中的dictionary元素实际上是字符串,因此我需要首先将它们作为dict进行计算。)
但是,由于我的 Dataframe 非常大,上述方法需要花费大量时间。
因此,我尝试使用多重处理。我使用多重处理的方法如下:
def get_price(row):
if row['price_list'][0]=='{':
prices = eval(row['price_list'])
for key,value in prices.items():
if str(row['shop_id']) in value:
price = key
break
price = np.nan
else:
price = row["price_list"]
return price
def parallelize(data, get_price, num_of_processes):
data_split = np.array_split(data, num_of_processes)
pool = mp.Pool(num_of_processes)
data = pd.concat(pool.map(get_price, data_split))
pool.close()
pool.join()
return data
def run_on_subset(get_price, data_subset):
data_subset['price'] = data_subset.apply(get_price, axis=1)
return data_subset
def parallelize_on_rows(data, get_price, num_of_processes):
return parallelize(data, partial(run_on_subset, get_price), num_of_processes)
df = parallelize_on_rows(df,get_price,num_processes)
现在,虽然我曾经在单核上运行apply,但它会一直运行下去。但在使用所有4个内核的多处理后,我会出现内存错误,内核死亡。
我有16GB的内存和4个内核。当脚本启动时,我已经使用了8GB。我运行的是64位Python 3.6。
我正在运行linux,只是使用多处理池。
如何在不使用更少内核的情况下,始终使用多处理来运行脚本?
1条答案
按热度按时间mf98qq941#
代替将输入拆分为
num_of_processes
,考虑将其拆分为该数目的10-20倍,这样一次只有5-10%的数据被复制到另一进程,还考虑固定map
的chunksize
参数,因为它采用默认取决于输入数目的值,该值可能不是1,因此可能会向其他进程发送比预期更多的数据。