使用python并行处理巨大json的最佳方法

von4xj4u  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(573)

我有一个大约5gb大小的巨大json和200万条记录。我正试图根据一个条件合并一些记录。在下面的示例中,我尝试为每个部分创建一条记录,并将所有学生信息添加为一个嵌套的json。我也想分手 subject 字段并将其转换为数组。这样的行动还有很多。
原始json:

[{"section": "abc-abc-abc", "student_id": "ss-23235", "subjects": "physics;maths;chemistry"},
{"section": "abc-abc-abc", "student_id": "ss-33237", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13632", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13265", "subjects": "physics;maths;chemistry"}]

我想把它转换成下面的形式

[
   {
      "section":"abc-abc-abc",
      "students":[
         {
            "student_id":"ss-23235",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         },
         {
            "student_id":"ss-33237",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         }
      ]
   },
   {
      "section":"xyz-xyz-xyz",
      "students":[
         {
            "student_id":"ss-13632",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         },
         {
            "student_id":"ss-13265",
            "subjects":[
               "physics",
               "maths",
               "chemistry"
            ]
         }
      ]
   }
]

我尝试在spark中加载数据,然后使用python在列表中创建唯一的会话 multiprocessing.pool 开始如下处理。

from multiprocessing.pool import ThreadPool
pool = ThreadPool(8)      

def process(section_part, student_df):
    # process section_part and store in a list  
    processed_data_list = [] 
    for section_id in section_part:       
        students = student_df.filter(student_df.section == section_id)
        updated_info = students.first().asDict()
        nested_stu_list = []
        for student in students.collect()[1:]: 
             ind_info = student.asDict()
            # process each records and store the data in ind_info
            # ind_info["subjects"]: ["physics", "maths", "chemistry"]
            nested_stu_list.append(ind_info)
        updated_info["students"] = nested_stu_list
    processed_data_list.append(updated_info)
    return processed_data_list     

uniq_section_list = student_df.select("section").distinct().collect()  

# create a list of lists with 10000 sections

section_parts = [uniq_section_list[i:i+10000] for i in range(0, len(uniq_section_list), 10000)]        

# Using lambda process each sublists

result_lists = pool.map(lambda part: process(part), section_parts)

# Merge all the result list into one bigger list

final_datalist = list(itertools.chain.from_iterable(result_lists))

# save as new json file

with open('result.json', 'w') as fout:
     json.dump(final_datalist, fout)

我正在运行这个 16GB RAM 以及 8 Core CPU . 作为一个样本 200000 记录这需要12个多小时。快速实现这一目标的最佳方法是什么?我愿意使用任何图书馆。

wf82jlnq

wf82jlnq1#

您可以使用spark来处理和聚合json:

import pyspark.sql.functions as F

result = df.groupBy('section').agg(
    F.collect_list(
        F.struct(
            'student_id', 
            F.split('subjects', ';').alias('subjects')
        )
    ).alias('students')
)

result.show(truncate=False)
+-----------+----------------------------------------------------------------------------------+
|section    |students                                                                          |
+-----------+----------------------------------------------------------------------------------+
|xyz-xyz-xyz|[[ss-13632, [physics, maths, chemistry]], [ss-13265, [physics, maths, chemistry]]]|
|abc-abc-abc|[[ss-23235, [physics, maths, chemistry]], [ss-33237, [physics, maths, chemistry]]]|
+-----------+----------------------------------------------------------------------------------+

result.coalesce(1).write.json('result')

相关问题