我正在使用pyspark将一些配置单元表格数据转换为json文档,并将输出写入hdfs以供下游使用。代码snippt如下所示:
def convertToJson(x):
data = ....... #transformation_code
output = row(str(json.dumps(data))
return output
df1 = spark.sql("""select * from json_ready_tbl""")
rdd1 = df.rdd.map(lambda x: convertToJson(x)).saveAsTextFile('/hdfs/output/dir')
converttojson函数的输出json是嵌套的,但在较高的级别上如下所示:
{
"id": 1001,
"info": {
"count": 12345,
"code": 999
}
}
所以基本上,输出文件的每一行都有一个json字符串。然后,我们在文本文件的顶部创建一个配置单元表,方法是向每行添加一些元数据,如日期和客户机名称,以供进一步的下游使用,如下所示:
date client_name json_obj
2020-04-01 zeus { "id": 1001, "info": { "count": 12345, "code": 999 } }
...
N number of rows
现在的挑战是,我们必须将这些最多1000个json文件批处理到一个json数组中,并写入hdfs,因此hive表中的输出应该如下所示:
date client_name json_batch
2020-04-01 zeus [{obj1}, {obj2}, {obj3},...{obj1000}]
...
T number of rows
我怎样才能做到这一点?感谢您的帮助。谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!