def data_preparation(df):
unlist = udf(lambda x: round(float(list(x)[0]),3), FloatType())
# Iterating over columns to be scaled
for i in ["event"]:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled",unlist(i+"_Scaled")).drop(i+"_Vect")
return df
在上面的unlist udf代码片段中,我试图从列表中取第一个元素,并将其舍入到3个小数位。但是当我使用这个函数时,它会给我这样的错误:
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pysparkx/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 200, in _batched
for item in iterator:
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
return f(*args, **kwargs)
File "<ipython-input-85-a4273b6bc9ab>", line 17, in <lambda>
File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1234, in round
return Column(sc._jvm.functions.round(_to_java_column(col), scale))
AttributeError: 'NoneType' object has no attribute '_jvm'
我试过单独做舍入操作,但在程序的后期阶段会出错。我只是在寻找这个问题的原因
2条答案
按热度按时间wvt8vs2t1#
问题出在线路上-
不如试试-
ffscu2ro2#
我会检查在获取列表的列中是否有空值。因此,我创建了一个示例数据集并应用了您的代码:
然后我故意改变了id 1中的数据输入,在x[0]处包含一个空值:
我没有得到和你完全相同的错误,但是数据集中出现none或null肯定会破坏udf。