创建从pyspark到hdfs的json批处理输出

nbysray5  于 2021-06-25  发布在  Hive
关注(0)|答案(0)|浏览(255)

我正在使用pyspark将一些配置单元表格数据转换为json文档,并将输出写入hdfs以供下游使用。代码snippt如下所示:

def convertToJson(x):
    data = ....... #transformation_code
    output = row(str(json.dumps(data))
    return output
df1 = spark.sql("""select * from json_ready_tbl""")
rdd1 = df.rdd.map(lambda x: convertToJson(x)).saveAsTextFile('/hdfs/output/dir')

converttojson函数的输出json是嵌套的,但在较高的级别上如下所示:

{
  "id": 1001,
  "info": {
     "count": 12345,
     "code": 999
      }
}

所以基本上,输出文件的每一行都有一个json字符串。然后,我们在文本文件的顶部创建一个配置单元表,方法是向每行添加一些元数据,如日期和客户机名称,以供进一步的下游使用,如下所示:

date         client_name  json_obj
2020-04-01   zeus         { "id": 1001, "info": { "count": 12345, "code": 999 } }
...
N number of rows

现在的挑战是,我们必须将这些最多1000个json文件批处理到一个json数组中,并写入hdfs,因此hive表中的输出应该如下所示:

date         client_name  json_batch
2020-04-01   zeus         [{obj1}, {obj2}, {obj3},...{obj1000}]
...
T number of rows

我怎样才能做到这一点?感谢您的帮助。谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题