我这里有2个JSON字符串每条记录。
第一个JSON字符串是变量,指示第二个JSON字符串中要替换的列。
例如输入。
import spark.implicits._
// Leaving timestamp aspect out and multiple occurrence of changes in same run, for simplicity.
val df = Seq((1, """{"b": "new", "c": "new"}""", """{"k": 1, "b": "old", "c": "old", "d": "old"}""" ),
(2, """{"b": "new", "d": "new"}""", """{"k": 2, "b": "old", "c": "old", "d": "old"}""" )).toDF("id", "chg", "before")
就像这样:
+---+------------------------+--------------------------------------------+
|id |chg |before |
+---+------------------------+--------------------------------------------+
|1 |{"b": "new", "c": "new"}|{"k": 1, "b": "old", "c": "old", "d": "old"}|
|2 |{"b": "new", "d": "new"}|{"k": 2, "b": "old", "c": "old", "d": "old"}|
+---+------------------------+--------------------------------------------+
我想要的是-但我不知道如何做,甚至不确定是否可能-是这个输出,因此在每行的基础上”。
+---+--------------------------------------------+
|id |after |
+---+------------------------+-------------------+
|1 |{"k": 1, "b": "new", "c": "new", "d": "old"}|
|2 |{"k": 2, "b": "new", "c": "old", "d": "new"}|
+---+------------------------+-------------------+
如果可以的话,这就有点意思了。我们可以在JSON中有一个数组,这也很好。
1条答案
按热度按时间ozxc1zmp1#
您可以走强类型化的路线,这使您能够对希望发生的确切转换进行大量控制。
我做了如下假设:
before
列包含每行中的所有字段chg
列从不包含k
字段。这样,
output
数据集就包含了您想要的值。既然您要求了它,我们可以再次将其转换为json字符串(收到的意见:这在Spark IMO中不太有用,所以除非你绝对需要在某个地方输出这个json,否则我不会这么做),方法如下:希望这对你有帮助!