使用python并行处理巨大json的最佳方法

von4xj4u  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(621)

我有一个大约5gb大小的巨大json和200万条记录。我正试图根据一个条件合并一些记录。在下面的示例中,我尝试为每个部分创建一条记录,并将所有学生信息添加为一个嵌套的json。我也想分手 subject 字段并将其转换为数组。这样的行动还有很多。
原始json:

  1. [{"section": "abc-abc-abc", "student_id": "ss-23235", "subjects": "physics;maths;chemistry"},
  2. {"section": "abc-abc-abc", "student_id": "ss-33237", "subjects": "physics;maths;chemistry"},
  3. {"section": "xyz-xyz-xyz", "student_id": "ss-13632", "subjects": "physics;maths;chemistry"},
  4. {"section": "xyz-xyz-xyz", "student_id": "ss-13265", "subjects": "physics;maths;chemistry"}]

我想把它转换成下面的形式

  1. [
  2. {
  3. "section":"abc-abc-abc",
  4. "students":[
  5. {
  6. "student_id":"ss-23235",
  7. "subjects":[
  8. "physics",
  9. "maths",
  10. "chemistry"
  11. ]
  12. },
  13. {
  14. "student_id":"ss-33237",
  15. "subjects":[
  16. "physics",
  17. "maths",
  18. "chemistry"
  19. ]
  20. }
  21. ]
  22. },
  23. {
  24. "section":"xyz-xyz-xyz",
  25. "students":[
  26. {
  27. "student_id":"ss-13632",
  28. "subjects":[
  29. "physics",
  30. "maths",
  31. "chemistry"
  32. ]
  33. },
  34. {
  35. "student_id":"ss-13265",
  36. "subjects":[
  37. "physics",
  38. "maths",
  39. "chemistry"
  40. ]
  41. }
  42. ]
  43. }
  44. ]

我尝试在spark中加载数据,然后使用python在列表中创建唯一的会话 multiprocessing.pool 开始如下处理。

  1. from multiprocessing.pool import ThreadPool
  2. pool = ThreadPool(8)
  3. def process(section_part, student_df):
  4. # process section_part and store in a list
  5. processed_data_list = []
  6. for section_id in section_part:
  7. students = student_df.filter(student_df.section == section_id)
  8. updated_info = students.first().asDict()
  9. nested_stu_list = []
  10. for student in students.collect()[1:]:
  11. ind_info = student.asDict()
  12. # process each records and store the data in ind_info
  13. # ind_info["subjects"]: ["physics", "maths", "chemistry"]
  14. nested_stu_list.append(ind_info)
  15. updated_info["students"] = nested_stu_list
  16. processed_data_list.append(updated_info)
  17. return processed_data_list
  18. uniq_section_list = student_df.select("section").distinct().collect()
  19. # create a list of lists with 10000 sections
  20. section_parts = [uniq_section_list[i:i+10000] for i in range(0, len(uniq_section_list), 10000)]
  21. # Using lambda process each sublists
  22. result_lists = pool.map(lambda part: process(part), section_parts)
  23. # Merge all the result list into one bigger list
  24. final_datalist = list(itertools.chain.from_iterable(result_lists))
  25. # save as new json file
  26. with open('result.json', 'w') as fout:
  27. json.dump(final_datalist, fout)

我正在运行这个 16GB RAM 以及 8 Core CPU . 作为一个样本 200000 记录这需要12个多小时。快速实现这一目标的最佳方法是什么?我愿意使用任何图书馆。

wf82jlnq

wf82jlnq1#

您可以使用spark来处理和聚合json:

  1. import pyspark.sql.functions as F
  2. result = df.groupBy('section').agg(
  3. F.collect_list(
  4. F.struct(
  5. 'student_id',
  6. F.split('subjects', ';').alias('subjects')
  7. )
  8. ).alias('students')
  9. )
  10. result.show(truncate=False)
  11. +-----------+----------------------------------------------------------------------------------+
  12. |section |students |
  13. +-----------+----------------------------------------------------------------------------------+
  14. |xyz-xyz-xyz|[[ss-13632, [physics, maths, chemistry]], [ss-13265, [physics, maths, chemistry]]]|
  15. |abc-abc-abc|[[ss-23235, [physics, maths, chemistry]], [ss-33237, [physics, maths, chemistry]]]|
  16. +-----------+----------------------------------------------------------------------------------+
  17. result.coalesce(1).write.json('result')
展开查看全部

相关问题