TypeError:StructType无法接受pyspark架构类型中的对象

yfjy0ee7  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(297)

我试图在spark中将一个rdd转换为一个 Dataframe 。我的rdd是通过一个整数列表的并行化生成的,在转换为 Dataframe 时我遇到了麻烦。它返回“TypeError:StructType无法接受类型〈class 'int'〉"中的对象60651。
在这里你可以看得更清楚:

  1. # Create a schema for the dataframe
  2. schema = StructType([StructField('zipcd', IntegerType(), True)] )
  3. # Convert list to RDD
  4. rdd = sc.parallelize(zip_cd) #solution: close within []. Another problem for the solution, if I do that, the problem 'lenght does not match: 29275 against 1' arises
  5. # rdd=rdd.map(lambda x:int(x))
  6. # Create data frame
  7. zip_cd1 = spark.createDataFrame(rdd,schema)
  8. # print(zip_cd1.schema)
  9. zip_cd1.show()

它返回以下内容:

  1. Py4JJavaError Traceback (most recent call last)
  2. <ipython-input-59-13ef33f842e4> in <module>
  3. 9 zip_cd1 = spark.createDataFrame(rdd,schema)
  4. 10 #print(zip_cd1.schema)
  5. ---> 11 zip_cd1.show()
  6. ~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
  7. 482 """
  8. 483 if isinstance(truncate, bool) and truncate:
  9. --> 484 print(self._jdf.showString(n, 20, vertical))
  10. 485 else:
  11. 486 print(self._jdf.showString(n, int(truncate), vertical))
  12. ~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
  13. 1307
  14. 1308 answer = self.gateway_client.send_command(command)
  15. -> 1309 return_value = get_return_value(
  16. 1310 answer, self.gateway_client, self.target_id, self.name)
  17. 1311
  18. ~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\utils.py in deco(*a,**kw)
  19. 109 def deco(*a,**kw):
  20. 110 try:
  21. --> 111 return f(*a,**kw)
  22. 112 except py4j.protocol.Py4JJavaError as e:
  23. 113 converted = convert_exception(e.java_exception)
  24. ~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
  25. 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
  26. 325 if answer[1] == REFERENCE_TYPE:
  27. --> 326 raise Py4JJavaError(
  28. 327 "An error occurred while calling {0}{1}{2}.\n".
  29. 328 format(target_id, ".", name), value)
  30. Py4JJavaError: An error occurred while calling o900.showString.
  31. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 1240) (MTYCURB-HOLAP.ACS-JRZ.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  32. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  33. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  34. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
  35. vs = list(itertools.islice(iterator, batch))
  36. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
  37. return f(*args,**kwargs)
  38. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\session.py", line 682, in prepare
  39. verify_func(obj)
  40. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1409, in verify
  41. verify_value(obj)
  42. File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1396, in verify_struct
  43. raise TypeError(new_msg("StructType can not accept object %r in type %s"
  44. TypeError: StructType can not accept object 60651 in type <class 'int'>

zip_cd只是一个整数列表,我不知道为什么它会给我带来很多麻烦:

  1. zip_cd
  2. [60651,
  3. 60623,
  4. 60077,
  5. 60626,
  6. 60077,
  7. 0,
  8. 60651,
  9. 60644,
brccelvz

brccelvz1#

您的结构描述需要(n,1)图形的集合输入,而不是(1,n)

  1. zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]
  2. schema = StructType([StructField('zipcd', IntegerType(), True)])
  3. rdd = sc.parallelize(zip_cd)
  4. rdd = rdd.map(lambda x:[x]) # transform the rdd
  5. zip_cd1 = spark.createDataFrame(rdd,schema)
  6. # zip_cd1 = spark.createDataFrame([[x] for x in zip_cd], schema) # list to dataframe directly
  7. zip_cd1.show()

结果

  1. +-----+
  2. |zipcd|
  3. +-----+
  4. |60651|
  5. |60623|
  6. |60077|
  7. |60626|
  8. |60077|
  9. | 0|
  10. |60651|
  11. |60644|
  12. +-----+
展开查看全部
mctunoxg

mctunoxg2#

createDataFrame函数需要一个列表列表,其中每个子列表表示一行:

  1. zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]
  2. reformatted_ = map(lambda x: [x], zip_cd)
  3. zip_cd1 = spark.createDataFrame(reformatted_, schema='zipcd int')

产生所需的Spark Dataframe

  1. +-----+
  2. |zipcd|
  3. +-----+
  4. |60651|
  5. |60623|
  6. |60077|
  7. |60626|
  8. |60077|
  9. | 0|
  10. |60651|
  11. |60644|
  12. +-----+
展开查看全部

相关问题