我有 train_rdd
就像 (('a',1),('b',2),('c',3))
. 我使用以下方法将其转换为Dataframe
from pyspark.sql import Row
train_label_df = train_rdd.map(lambda x: (Row(**dict(x)))).toDF()
但也许某些RDD中缺少一些密钥。所以会出现错误。
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000017/pyspark.zip/pyspark/worker.py", line
253, in main
process()
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000017/pyspark.zip/pyspark/worker.py", line
248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000002/pyspark.zip/pyspark/rdd.py", line
2440, in pipeline_func
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000002/pyspark.zip/pyspark/rdd.py", line
2440, in pipeline_func
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000002/pyspark.zip/pyspark/rdd.py", line
350, in func
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000002/pyspark.zip/pyspark/rdd.py", line
1859, in combineLocally
File
"/mnt/hadoop/yarn/local/usercache/hdfs/appcache/application_/container_05_000017/pyspark.zip/pyspark/shuffle.py", line
237, in mergeValues
for k, v in iterator:
TypeError: cannot unpack non - iterable NoneType object
有没有其他方法可以将元组类型rdd转换为Dataframe?
更新:
我也试着用 createDataFrame
.
rdd = sc.parallelize([('a',1), (('a',1), ('b',2)), (('a',1), ('b',2), ('c',3) ) ])
schema = StructType([
StructField("a", StringType(), True),
StructField("b", StringType(), True),
StructField("c", StringType(), True),
])
train_label_df = sqlContext.createDataFrame(rdd, schema)
train_label_df.show()
出现错误。
File "/home/spark/python/pyspark/sql/types.py", line 1400, in verify_struct
"length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: Length of object (2) does not match with length of fields (3)
1条答案
按热度按时间tez616oj1#
可以将元组Map到dict:
然后执行以下操作之一:
或