如何在Pyspark中用scipy.stats和UDF创建一个正态分布的数组(或其他方法)?

piv4azn7  于 2022-11-10  发布在  Spark
关注(0)|答案(1)|浏览(182)

我目前正致力于将Python脚本迁移到PySpark,我有一个运行良好的Python脚本:


### PYTHON

import pandas as pd
import scipy.stats as st

def fnNormalDistribution(mean,std, n):
    box = list(eval('st.norm')(*[mean,std]).rvs(n))
    return box

df = pd.DataFrame([[18.2500365,2.7105814157004193],
                    [9.833353,2.121324586200329],
                    [41.55563866666666,7.118716782527054]],
                    columns = ['mean','std'])
df 

|    mean    |    std   |
|------------|----------|
|   18.250037|  2.710581|
|    9.833353|  2.121325|
|   41.555639|  7.118717|

n = 100 #Example
df['random_values'] = df.apply(lambda row: fnNormalDistribution(row["mean"], row["std"], n), axis=1)

df

|    mean    |    std   |                   random_values                  |
|------------|----------|--------------------------------------------------|
|   18.250037|  2.710581|[17.752189993958638, 18.883038367927465, 16.39...]|
|    9.833353|  2.121325|[10.31806454283759, 8.732261487201594, 11.6782...]|
|   41.555639|  7.118717|[38.17469739795093, 43.16514466083524, 49.2668...]|

但是当我尝试迁移到Pyspark时,我收到以下错误:


### PYSPARK

def fnNormalDistribution(mean,std, n):
    box = list(eval('st.norm')(*[mean,std]).rvs(n))
    return box

udf_fnNomalDistribution = f.udf(fnNormalDistribution, t.ArrayType(t.DoubleType()))

columns = ['mean','std']
data = [(18.2500365,2.7105814157004193),
    (9.833353,2.121324586200329),
    (41.55563866666666,7.118716782527054)]

df = spark.createDataFrame(data=data,schema=columns)
df.show()

|    mean    |    std   |
|------------|----------|
|   18.250037|  2.710581|
|    9.833353|  2.121325|
|   41.555639|  7.118717|

df = df.withColumn('random_values', udf_fnNomalDistribution('mean','std',f.lit(n)))
df.show()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\serializers.py", line 200, in _batched
    for item in iterator:
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 450, in mapper
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 450, in <genexpr>
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 85, in <lambda>
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
    return f(*args,**kwargs)
  File "C:\Users\Ubits\AppData\Local\Temp/ipykernel_10604/2493247477.py", line 2, in fnNormalDistribution
  File "<string>", line 1, in <module>
NameError: name 'st' is not defined

有没有什么方法可以在Pyspark中使用相同的函数,或者用其他方法获得random_values列?我在谷歌上搜索了一下,没有退出。
谢谢

ulydmbyx

ulydmbyx1#

我正在尝试这个,它真的可以通过像samkart建议的那样将st移动到fnNormalDistribution内部来修复。
我将把我的例子留在这里,因为Fugue可能提供了一种更具可读性的方式来将其引入Spark,特别是在处理模式方面。

import pandas as pd

def fnNormalDistribution(mean,std, n):
    import scipy.stats as st
    box = (eval('st.norm')(*[mean,std]).rvs(n)).tolist()
    return box

df = pd.DataFrame([[18.2500365,2.7105814157004193],
                    [9.833353,2.121324586200329],
                    [41.55563866666666,7.118716782527054]],
                    columns = ['mean','std'])

n = 100 #Example

def helper(df: pd.DataFrame) -> pd.DataFrame:
    df['random_values'] = df.apply(lambda row: fnNormalDistribution(row["mean"], row["std"], n), axis=1)
    return df

from fugue import transform
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# transform can take either pandas of spark DataFrame as input

# If engine is none, it will run on pandas

sdf = transform(df,
          helper,
          schema="*, random_values:[float]",
          engine=spark)

sdf.show()

相关问题