我有一个大约5gb大小的巨大json和200万条记录。我正试图根据一个条件合并一些记录。在下面的示例中,我尝试为每个部分创建一条记录,并将所有学生信息添加为一个嵌套的json。我也想分手 subject
字段并将其转换为数组。这样的行动还有很多。
原始json:
[{"section": "abc-abc-abc", "student_id": "ss-23235", "subjects": "physics;maths;chemistry"},
{"section": "abc-abc-abc", "student_id": "ss-33237", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13632", "subjects": "physics;maths;chemistry"},
{"section": "xyz-xyz-xyz", "student_id": "ss-13265", "subjects": "physics;maths;chemistry"}]
我想把它转换成下面的形式
[
{
"section":"abc-abc-abc",
"students":[
{
"student_id":"ss-23235",
"subjects":[
"physics",
"maths",
"chemistry"
]
},
{
"student_id":"ss-33237",
"subjects":[
"physics",
"maths",
"chemistry"
]
}
]
},
{
"section":"xyz-xyz-xyz",
"students":[
{
"student_id":"ss-13632",
"subjects":[
"physics",
"maths",
"chemistry"
]
},
{
"student_id":"ss-13265",
"subjects":[
"physics",
"maths",
"chemistry"
]
}
]
}
]
我尝试在spark中加载数据,然后使用python在列表中创建唯一的会话 multiprocessing.pool
开始如下处理。
from multiprocessing.pool import ThreadPool
pool = ThreadPool(8)
def process(section_part, student_df):
# process section_part and store in a list
processed_data_list = []
for section_id in section_part:
students = student_df.filter(student_df.section == section_id)
updated_info = students.first().asDict()
nested_stu_list = []
for student in students.collect()[1:]:
ind_info = student.asDict()
# process each records and store the data in ind_info
# ind_info["subjects"]: ["physics", "maths", "chemistry"]
nested_stu_list.append(ind_info)
updated_info["students"] = nested_stu_list
processed_data_list.append(updated_info)
return processed_data_list
uniq_section_list = student_df.select("section").distinct().collect()
# create a list of lists with 10000 sections
section_parts = [uniq_section_list[i:i+10000] for i in range(0, len(uniq_section_list), 10000)]
# Using lambda process each sublists
result_lists = pool.map(lambda part: process(part), section_parts)
# Merge all the result list into one bigger list
final_datalist = list(itertools.chain.from_iterable(result_lists))
# save as new json file
with open('result.json', 'w') as fout:
json.dump(final_datalist, fout)
我正在运行这个 16GB RAM
以及 8 Core CPU
. 作为一个样本 200000
记录这需要12个多小时。快速实现这一目标的最佳方法是什么?我愿意使用任何图书馆。
1条答案
按热度按时间wf82jlnq1#
您可以使用spark来处理和聚合json: