当与udf一起使用时,Round函数在pyspark中给出错误

uyhoqukh  于 2023-05-16  发布在  Spark
关注(0)|答案(2)|浏览(220)
def data_preparation(df):

    unlist = udf(lambda x: round(float(list(x)[0]),3), FloatType())
    # Iterating over columns to be scaled
    for i in ["event"]:
        # VectorAssembler Transformation - Converting column to vector type
        assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

        # MinMaxScaler Transformation
        scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

        # Pipeline of VectorAssembler and MinMaxScaler
        pipeline = Pipeline(stages=[assembler, scaler])

        # Fitting pipeline on dataframe
        df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled",unlist(i+"_Scaled")).drop(i+"_Vect")

    return df

在上面的unlist udf代码片段中,我试图从列表中取第一个元素,并将其舍入到3个小数位。但是当我使用这个函数时,它会给我这样的错误:

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pysparkx/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 200, in _batched
    for item in iterator:
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-85-a4273b6bc9ab>", line 17, in <lambda>
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1234, in round
    return Column(sc._jvm.functions.round(_to_java_column(col), scale))
AttributeError: 'NoneType' object has no attribute '_jvm'

我试过单独做舍入操作,但在程序的后期阶段会出错。我只是在寻找这个问题的原因

wvt8vs2t

wvt8vs2t1#

问题出在线路上-

from pyspark.sql.functions import *

不如试试-

import pyspark.sql.functions as f
ffscu2ro

ffscu2ro2#

我会检查在获取列表的列中是否有空值。因此,我创建了一个示例数据集并应用了您的代码:

#THIS ONE WORKS
data = [(1, [3.143412412, 2.444]), (2, [2.718, 0.9090909]), (3, [1.414, 0.12312312321])]
df = spark.createDataFrame(data, ["id", "value"])
unlist = udf(lambda x: round(float(list(x)[0]),3), FloatType())

# Cast the double column to float
df = df.withColumn("value_float", unlist("value"))

然后我故意改变了id 1中的数据输入,在x[0]处包含一个空值:

#THIS ONE DOESN'T WORKS
data = [(1, [None, 2.444]), (2, [2.718, 0.9090909]), (3, [1.414, 0.12312312321])]
df = spark.createDataFrame(data, ["id", "value"])
unlist = udf(lambda x: round(float(list(x)[0]),3), FloatType())

# Cast the double column to float
df = df.withColumn("value_float", unlist("value"))

我没有得到和你完全相同的错误,但是数据集中出现none或null肯定会破坏udf。

相关问题