我目前正致力于将Python脚本迁移到PySpark,我有一个运行良好的Python脚本:
### PYTHON
import pandas as pd
import scipy.stats as st
def fnNormalDistribution(mean,std, n):
box = list(eval('st.norm')(*[mean,std]).rvs(n))
return box
df = pd.DataFrame([[18.2500365,2.7105814157004193],
[9.833353,2.121324586200329],
[41.55563866666666,7.118716782527054]],
columns = ['mean','std'])
df
| mean | std |
|------------|----------|
| 18.250037| 2.710581|
| 9.833353| 2.121325|
| 41.555639| 7.118717|
n = 100 #Example
df['random_values'] = df.apply(lambda row: fnNormalDistribution(row["mean"], row["std"], n), axis=1)
df
| mean | std | random_values |
|------------|----------|--------------------------------------------------|
| 18.250037| 2.710581|[17.752189993958638, 18.883038367927465, 16.39...]|
| 9.833353| 2.121325|[10.31806454283759, 8.732261487201594, 11.6782...]|
| 41.555639| 7.118717|[38.17469739795093, 43.16514466083524, 49.2668...]|
但是当我尝试迁移到Pyspark时,我收到以下错误:
### PYSPARK
def fnNormalDistribution(mean,std, n):
box = list(eval('st.norm')(*[mean,std]).rvs(n))
return box
udf_fnNomalDistribution = f.udf(fnNormalDistribution, t.ArrayType(t.DoubleType()))
columns = ['mean','std']
data = [(18.2500365,2.7105814157004193),
(9.833353,2.121324586200329),
(41.55563866666666,7.118716782527054)]
df = spark.createDataFrame(data=data,schema=columns)
df.show()
| mean | std |
|------------|----------|
| 18.250037| 2.710581|
| 9.833353| 2.121325|
| 41.555639| 7.118717|
df = df.withColumn('random_values', udf_fnNomalDistribution('mean','std',f.lit(n)))
df.show()
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 211, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 132, in dump_stream
for obj in iterator:
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 200, in _batched
for item in iterator:
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 450, in mapper
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 450, in <genexpr>
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 85, in <lambda>
File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
return f(*args,**kwargs)
File "C:\Users\Ubits\AppData\Local\Temp/ipykernel_10604/2493247477.py", line 2, in fnNormalDistribution
File "<string>", line 1, in <module>
NameError: name 'st' is not defined
有没有什么方法可以在Pyspark中使用相同的函数,或者用其他方法获得random_values列?我在谷歌上搜索了一下,没有退出。
谢谢
1条答案
按热度按时间ulydmbyx1#
我正在尝试这个,它真的可以通过像samkart建议的那样将
st
移动到fnNormalDistribution
内部来修复。我将把我的例子留在这里,因为Fugue可能提供了一种更具可读性的方式来将其引入Spark,特别是在处理模式方面。