目前,我们正在将Spark Dataframe 转换为JSON字符串,以发送给Kafka。
在此过程中,我们对JSON执行了两次操作,为内部JSON插入了。
代码片段:
val df=spark.sql("select * from dB.tbl")
val bus_dt="2022-09-23"
case class kafkaMsg(busDate:String,msg:String)
假设我的df有2列ID,Status,这将构成我的Kafka消息的内部json。
JSON是为msg创建的,并应用于Case类。
val rdd=df.toJSON.rdd.map(msg=>kafkaMsg(busDate,msg))
此步骤的输出:
kafkaMsg(2022-09-23,{"id":1,"status":"active"})
现在,为了将busDate和msg作为JSON发送给Kafka,再次应用了toJSON。
val df1=spark.createDataFrame(rdd).toJSON
输出为:
{"busDate":"2022-09-23","msg":"{"id":1,"status":"active"}"}
内部JSON正在拥有,这不是消费者所期望的。
预期的JSON:
{"busDate":"2022-09-23","msg":{"id":1,"status":"active"}}
我怎么才能在没有\的情况下创建这个json并发送给Kafka。
请注意,msg值会有所不同,不能Map到架构。
1条答案
按热度按时间yqhsw0fo1#
您的
msg
被转义,因为它已经是一个字符串。因此,当您转换为JSON时,您是在对字符串进行字符串处理...JSON可以表示为
Map[String, ?]
,所以如果您的输入数据还没有模式,请定义一个模式。以PySpark为例。
架构-请注意,
msg
不是字符串,而是Map[String, String]
。另外,您不能有多个值类型-Spark SQL and MapType with string keys and any values作为JSON-你不需要Jackson,也不需要和RDDS打交道。
不是逃跑..。
请注意,
id
类型已转换。如果这不是您想要的,那么您需要使用msg : StructType
而不是MapType
,并给出id : IntegerType
。(显然,这假设 Dataframe 中的所有记录都是一致键入的)您也可以取出密钥(切换到使用
spark.sql.functions
)然后,您可以像往常一样使用
kafkaDf.write.format("kafka")
或者,如果您希望将字符串信息 Package 在单个字段中,而不是键-值对中,那么您的Kafka使用者将需要自己处理这一问题,例如对记录和内部字符串(JSON值)进行双重反序列化。