将RDD转换为 Dataframe ,但每行的RDD具有不同的字段集

s5a0g9ez  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(188)

我有RDD:

  1. [{'systemID': '617914',
  2. 'typeID': '1',
  3. 'taxID': '1',
  4. 'workerID': '1011778',
  5. 'workerExID': '70000111',
  6. 'number': '5',
  7. 'shiftNumber': '167',
  8. 'numberInShift': '6',
  9. 'printedDate': '2022-10-03T15:38:09',
  10. 'total': '990.0000',
  11. 'IsEReceipt': 'false',
  12. 'version': '1.05',
  13. 'receiptKISExid': '023442935',
  14. 'attribute_1': '1234567890',
  15. 'attribute_2': '65e2b71b-c2de-4681-9cf1-29701f5ce6bb',
  16. 'city_id': 'BC29FE50'},
  17. {'systemID': '617915',
  18. 'typeID': '1',
  19. 'taxID': '1',
  20. 'workerID': '1011778',
  21. 'workerExID': '70000111',
  22. 'number': '6',
  23. 'shiftNumber': '167',
  24. 'numberInShift': '7',
  25. 'printedDate': '2022-10-03T16:48:35',
  26. 'total': '640.0000',
  27. 'IsEReceipt': 'false',
  28. 'version': '1.05',
  29. 'receiptKISExid': '0234434052',
  30. 'attribute_3': '00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000',
  31. 'city_id': 'BC29FE50'},
  32. {'systemID': '617916',
  33. 'typeID': '1',
  34. 'taxID': '1',
  35. 'workerID': '1011778',
  36. 'workerExID': '70000111',
  37. 'number': '7',
  38. 'shiftNumber': '167',
  39. 'numberInShift': '8',
  40. 'printedDate': '2022-10-03T17:19:46',
  41. 'total': '310.0000',
  42. 'version': '1.05',
  43. 'receiptKISExid': '0234435605',
  44. 'attribute_1': '1234567890',
  45. 'city_id': 'BC29FE50'}]

应将其转换为带字段的DataFrame:

system ID、typeID、axID、workerID、workerExID、Number、ShiftNumber、number InShift、printedDate、Total、IsEReceipt、Version、ReceiptKISExid、ATTRIBUTE_1、ATTRIBUTE_2、ATTRIBUTE_3、CITY_ID

但字段ATTRIBUTE_1、ATTRIBUTE_2、ATTRIBUTE_3不能为每行而存在。转换为DataFrame后,我只从前面提到的字段获取数据,例如,我得到ATTRIBUTE_1,但ATTRIBUTE_2ATTRIBUTE_3为空。

现在我使用:

  1. ORDER_COL_NAMES = ['systemID','typeID','taxID','workerID','workerExID','number','shiftNumber','numberInShift','printedDate','total','IsEReceipt','version','receiptKISExid','attribute_1','attribute_2','attribute_3','city_id']
  2. def set_schema():
  3. schema_list = []
  4. for c in ORDER_COL_NAMES:
  5. if c == 'total':
  6. schema_list.append(StructField(c, FloatType(), True))
  7. elif c == 'print_date':
  8. schema_list.append(StructField(c, DateType(), True))
  9. else:
  10. schema_list.append(StructField(c, StringType(), True))
  11. return StructType(schema_list)
  12. df_schema = set_schema()
  13. order_df = orders.toDF(df_schema)

提前谢谢您!

zujrkrfu

zujrkrfu1#

我希望下面的代码将解决您显式处理模式的问题

  1. data = [{'systemID': '617914',
  2. 'typeID': '1',
  3. 'taxID': '1',
  4. 'workerID': '1011778',
  5. 'workerExID': '70000111',
  6. 'number': '5',
  7. 'shiftNumber': '167',
  8. 'numberInShift': '6',
  9. 'printedDate': '2022-10-03T15:38:09',
  10. 'total': '990.0000',
  11. 'IsEReceipt': 'false',
  12. 'version': '1.05',
  13. 'receiptKISExid': '023442935',
  14. 'attribute_1': '1234567890',
  15. 'attribute_2': '65e2b71b-c2de-4681-9cf1-29701f5ce6bb',
  16. 'city_id': 'BC29FE50'},
  17. {'systemID': '617915',
  18. 'typeID': '1',
  19. 'taxID': '1',
  20. 'workerID': '1011778',
  21. 'workerExID': '70000111',
  22. 'number': '6',
  23. 'shiftNumber': '167',
  24. 'numberInShift': '7',
  25. 'printedDate': '2022-10-03T16:48:35',
  26. 'total': '640.0000',
  27. 'IsEReceipt': 'false',
  28. 'version': '1.05',
  29. 'receiptKISExid': '0234434052',
  30. 'attribute_3': '00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000',
  31. 'city_id': 'BC29FE50'},
  32. {'systemID': '617916',
  33. 'typeID': '1',
  34. 'taxID': '1',
  35. 'workerID': '1011778',
  36. 'workerExID': '70000111',
  37. 'number': '7',
  38. 'shiftNumber': '167',
  39. 'numberInShift': '8',
  40. 'printedDate': '2022-10-03T17:19:46',
  41. 'total': '310.0000',
  42. 'version': '1.05',
  43. 'receiptKISExid': '0234435605',
  44. 'attribute_1': '1234567890',
  45. 'city_id': 'BC29FE50'}]
  46. df = spark.read.option("multiline", "true").json(sc.parallelize(data))
  47. df.show(truncate=False)
  48. +----------+-----------+------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+------+-------------+-------------------+--------------+-----------+--------+-----+--------+------+-------+----------+--------+
  49. |IsEReceipt|attribute_1|attribute_2 |attribute_3 |city_id |number|numberInShift|printedDate |receiptKISExid|shiftNumber|systemID|taxID|total |typeID|version|workerExID|workerID|
  50. +----------+-----------+------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+------+-------------+-------------------+--------------+-----------+--------+-----+--------+------+-------+----------+--------+
  51. |false |1234567890 |65e2b71b-c2de-4681-9cf1-29701f5ce6bb|null |BC29FE50|5 |6 |2022-10-03T15:38:09|023442935 |167 |617914 |1 |990.0000|1 |1.05 |70000111 |1011778 |
  52. |false |null |null |00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000|BC29FE50|6 |7 |2022-10-03T16:48:35|0234434052 |167 |617915 |1 |640.0000|1 |1.05 |70000111 |1011778 |
  53. |null |1234567890 |null |null |BC29FE50|7 |8 |2022-10-03T17:19:46|0234435605 |167 |617916 |1 |310.0000|1 |1.05 |70000111 |1011778 |
  54. +----------+-----------+------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+------+-------------+-------------------+--------------+-----------+--------+-----+--------+------+-------+----------+--------+
展开查看全部

相关问题