pysparkdataframe列值替换为pysparkversion2.3中另一个列表中的索引

byqmnocz  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(168)

我有一个pysparkDataframe,列值作为一个列表。我需要用基于另一个python列表的索引替换列表中的值

outer_list = [b,c,a,e,f,d]
+--------------------+--------------------+
|             USER_ID|            OFFERIDS|
+--------------------+--------------------+
| X                  + [a,b,c]            | 
+--------------------+--------------------+
| Y                  + [d,e,f]            |
+--------------------+--------------------+

+--------------------+--------------------+--------------------+
|             USER_ID|            OFFERIDS|             INDEXED|
+--------------------+--------------------+--------------------+          
| X                  + [a,b,c]            |        [2,0,1]     |
+--------------------+--------------------+--------------------+ 
| Y                  + [d,e,f]            |        [5,3,4]     |
+--------------------+--------------------+--------------------+

在pyspark中创建自定义项不起作用

replace_index = udf(lambda x:  [outer_list.index(offer) for offer in x if offer in outer_list ], ArrayType(IntegerType()) )

谁能帮我一下,我哪里出错了

userid_offerid_index = df.withColumn('INDEXED_OFFERIDS', replace_index('OFFERIDS'))
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 186, in wrapper
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 164, in __call__
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 148, in _judf
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 157, in _create_judf
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 33, in _wrap_function
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2391, in _prepare_for_python_RDD
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 575, in dumps
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 918, in dumps
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 249, in dump
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o69.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题