HDFS 如何使用pyarrowParquet地板与多处理

yrwegjxp  于 2022-12-09  发布在  HDFS
关注(0)|答案(2)|浏览(166)

我想使用pyarrowmultiprocessing同时读取多个hdfs文件,简单的python脚本可以工作(见下文),但是如果我尝试使用multiprocessing做同样的事情,它就会无限期地挂起。
我唯一的猜测是env在某种程度上是不同的,但是所有的环境变量在子进程和父进程中应该是相同的。
我已经尝试过使用print()来调试它;设置为仅1个线程。令我惊讶的是,当仅1个线程时,这甚至会失败。

**那么,可能的原因是什么?**如何调试?

编码:

import pyarrow.parquet as pq

def read_pq(file):
  table = pq.read_table(file)
  return table

##### this works #####
table = read_pq('hdfs://myns/mydata/000000_0')

###### this doesnt work #####
import multiprocessing

result_async=[]
with Pool(1) as pool:
  result_async.append( pool.apply_async(pq.read_table, args = ('hdfs://myns/mydata/000000_0',)) )
  results = [r.get() for r in result_async]  ###### hangs here indefinitely, no exceptions raised
  print(results)    ###### expecting to get List[pq.Table]

#########################
ukqbszuj

ukqbszuj1#

您是否尝试过在用户定义的函数中导入pq,以便每个进程(库所需)所需的任何初始化都可以在池中的每个进程中发生?

def read_pq(file):
  import pyarrow.parquet as pq
  table = pq.read_table(file)
  return table

###### this doesnt work #####
import multiprocessing

result_async=[]
with Pool(1) as pool:
  result_async.append( pool.apply_async(read_pq, args = ('hdfs://myns/mydata/000000_0',)) )
  results = [r.get() for r in result_async]  ###### hangs here indefinitely, no exceptions raised
  print(results)    ###### expecting to get List[pq.Table]

#########################
9q78igpj

9q78igpj2#

问题是由于我缺乏多处理经验。
解决方法是添加:

from multiprocessing import set_start_method
set_start_method("spawn")

解决方案和原因正是https://pythonspeed.com/articles/python-multiprocessing/所描述的:日志记录被分叉并导致死锁。
而且,虽然我只有“Pool(1)",但实际上我有父进程加上子进程,所以我仍然有两个进程,所以死锁问题存在。

相关问题