我有一个要求,写以下输出格式。
primary_key_value^attribute1:value1;attribute2:value2;attribute3:value3;attribute4:value4
输出将被写入一个文件。我可以手动concat值并将其变成一个字符串。有没有什么最佳实践可以让Spark写入此输出
wmvff8tz1#
你可以用concat或concat_ws来添加列的名称,并写分号作为分隔符。在scala中,它看起来像这样:
concat
concat_ws
val df = Seq((0, "val1", "val2", "val3")).toDF("id", "col1", "col2", "col3") val res = df .select(df.columns.map(c => concat_ws(":", lit(c), col(c)).alias(c)) : _*) res.show()
+----+---------+---------+---------+ | id| col1| col2| col3| +----+---------+---------+---------+ |id:0|col1:val1|col2:val2|col3:val3| +----+---------+---------+---------+
然后:
res.write.option("sep", ";").csv("...")
5vf7fwbs2#
在Pyspark中,对于每个列,您可以使用concat函数来连接列名及其值,并在select运算符中应用所有这些。在你用csv函数写了这个之后:
select
df.select(* [f.concat(col, f.lit(":"), f.lit(col)) for col in df.columns] ).write.option("header", "false").option("delimiter", ";").csv("../path")
2条答案
按热度按时间wmvff8tz1#
你可以用
concat
或concat_ws
来添加列的名称,并写分号作为分隔符。在scala中,它看起来像这样:然后:
5vf7fwbs2#
在Pyspark中,对于每个列,您可以使用
concat
函数来连接列名及其值,并在select
运算符中应用所有这些。在你用csv函数写了这个之后: