使用Spark自定义输出文件格式

4uqofj5v  于 2023-10-23  发布在  Apache
关注(0)|答案(2)|浏览(161)

我有一个要求,写以下输出格式。

primary_key_value^attribute1:value1;attribute2:value2;attribute3:value3;attribute4:value4

输出将被写入一个文件。我可以手动concat值并将其变成一个字符串。有没有什么最佳实践可以让Spark写入此输出

wmvff8tz

wmvff8tz1#

你可以用concatconcat_ws来添加列的名称,并写分号作为分隔符。在scala中,它看起来像这样:

val df = Seq((0, "val1", "val2", "val3")).toDF("id", "col1", "col2", "col3")
val res = df
  .select(df.columns.map(c => concat_ws(":", lit(c), col(c)).alias(c)) : _*)
res.show()
+----+---------+---------+---------+
|  id|     col1|     col2|     col3|
+----+---------+---------+---------+
|id:0|col1:val1|col2:val2|col3:val3|
+----+---------+---------+---------+

然后:

res.write.option("sep", ";").csv("...")
5vf7fwbs

5vf7fwbs2#

在Pyspark中,对于每个列,您可以使用concat函数来连接列名及其值,并在select运算符中应用所有这些。
在你用csv函数写了这个之后:

df.select(* [f.concat(col, f.lit(":"), f.lit(col)) for col in df.columns] ).write.option("header", "false").option("delimiter", ";").csv("../path")

相关问题