以非字符串化格式向kafka发送json事件

xwbd5t1u  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(528)

我已经创建了一个如下所示的Dataframe,在这里我使用了\u json()方法来创建json数组值。

  1. +----------------------------------------------------------------------------------------------------
  2. |json_data |
  3. +-----------------------------------------------------------------------------------------------------------+
  4. |{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
  5. +-----------------------------------------------------------------------------------------------------------+

我使用下面的方法将Dataframe发送到Kafka主题。但是当我使用发送到kafka主题的数据时,我可以看到json数据被字符串化了。
将数据推送到Kafka的代码:

  1. outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
  2. .write
  3. .format("kafka")
  4. .option("topic", "topic_test")
  5. .option("kafka.bootstrap.servers", "localhost:9093")
  6. .option("checkpointLocation", checkpointPath)
  7. .option("kafka.sasl.mechanism", "PLAIN")
  8. .option("kafka.security.protocol", "SASL_SSL")
  9. .option("truncate", false)
  10. .save()

Kafka接收到的字符串化数据:

  1. {
  2. "name": "sensor1",
  3. "value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
  4. }

我们如何将数据发送到kafka主题,以便不将字符串化的json视为输出?

z2acfund

z2acfund1#

json_data 属于类型 string &你又路过了 json_datato_json(struct("*")) 功能。
检查 value 去Kafka的专栏。

  1. df.withColumn("value",to_json(struct($"*"))).show(false)
  2. +-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
  3. |json_data |value |
  4. +-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
  5. |{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{\"name\":\"sensor1\",\"value-array\":[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]}"}|
  6. +-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+

试试下面的代码。

  1. df
  2. .withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
  3. .selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
  4. .write
  5. .format("kafka")
  6. .option("topic", "topic_test")
  7. .option("kafka.bootstrap.servers", "localhost:9093")
  8. .option("checkpointLocation", checkpointPath)
  9. .option("kafka.sasl.mechanism", "PLAIN")
  10. .option("kafka.security.protocol", "SASL_SSL")
  11. .option("truncate", false)
  12. .save()
展开查看全部

相关问题