我的目标是通过RDD.pipe从pyspark调用一个外部(dotnet)进程。由于这失败了,我想测试管道到一个简单的命令:
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
result_rdd = spark.sparkContext.parallelize(['1', '2', '', '3']).pipe(command).collect()
但是,我得到错误消息:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) ( executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\projectpath\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 686, in main
File "C:\projectpath\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 676, in process
File "C:\projectpath\.venv\lib\site-packages\pyspark\rdd.py", line 540, in func
return f(iterator)
File "C:\projectpath\.venv\lib\site-packages\pyspark\rdd.py", line 1117, in func
pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
File "C:\Users\username\AppData\Local\Programs\Python\Python39\lib\subprocess.py", line 951, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "C:\Users\username\AppData\Local\Programs\Python\Python39\lib\subprocess.py", line 1420, in _execute_child
hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
FileNotFoundError: [WinError 2] The system cannot find the file specified
1条答案
按热度按时间kadbb4591#
更新:我找到了一个变通办法,让它为我工作。我看了一下pipe函数的pyspark实现,如果没有给出env参数,他们会使用一个空字典作为Popen的env参数,这导致了我直接为Popen做同样的错误。只是添加了一些字典的值修复了这个问题: