在Scala中从Spark Dataframe 创建JSON

5lhxktic  于 2022-10-07  发布在  Scala
关注(0)|答案(1)|浏览(191)

目前,我们正在将Spark Dataframe 转换为JSON字符串,以发送给Kafka。

在此过程中,我们对JSON执行了两次操作,为内部JSON插入了。

代码片段:

val df=spark.sql("select * from dB.tbl")

val bus_dt="2022-09-23" 
case class kafkaMsg(busDate:String,msg:String)

假设我的df有2列ID,Status,这将构成我的Kafka消息的内部json。

JSON是为msg创建的,并应用于Case类。

val rdd=df.toJSON.rdd.map(msg=>kafkaMsg(busDate,msg))

此步骤的输出:

kafkaMsg(2022-09-23,{"id":1,"status":"active"})

现在,为了将busDate和msg作为JSON发送给Kafka,再次应用了toJSON。

val df1=spark.createDataFrame(rdd).toJSON

输出为:

{"busDate":"2022-09-23","msg":"{"id":1,"status":"active"}"}

内部JSON正在拥有,这不是消费者所期望的。

预期的JSON:

{"busDate":"2022-09-23","msg":{"id":1,"status":"active"}}

我怎么才能在没有\的情况下创建这个json并发送给Kafka。

请注意,msg值会有所不同,不能Map到架构。

yqhsw0fo

yqhsw0fo1#

您的msg被转义,因为它已经是一个字符串。因此,当您转换为JSON时,您是在对字符串进行字符串处理...

JSON可以表示为Map[String, ?],所以如果您的输入数据还没有模式,请定义一个模式。

以PySpark为例。

scm = StructType([
    StructField('busDate', StringType(), nullable=False),
    StructField('msg', MapType(StringType(), StringType()), nullable=False)
])
sdf = spark.createDataFrame([
  ('2022-09-23', {"id":1,"status":"active"}),
], schema=scm)

架构-请注意,msg不是字符串,而是Map[String, String]。另外,您不能有多个值类型-Spark SQL and MapType with string keys and any values

root
 |-- busDate: string (nullable = false)
 |-- msg: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

作为JSON-你不需要Jackson,也不需要和RDDS打交道。

kafkaDf = sdf.selectExpr("to_json(struct(*)) as value")
kafkaDf.show(truncate=False)

不是逃跑..。

请注意,id类型已转换。如果这不是您想要的,那么您需要使用msg : StructType而不是MapType,并给出id : IntegerType。(显然,这假设 Dataframe 中的所有记录都是一致键入的)

+-----------------------------------------------------------+
|value                                                      |
+-----------------------------------------------------------+
|{"busDate":"2022-09-23","msg":{"id":"1","status":"active"}}|
+-----------------------------------------------------------+

您也可以取出密钥(切换到使用spark.sql.functions)

kafkaDf = sdf.select(
    f.col("msg.id").cast("int").alias('key'), 
    f.to_json(f.struct('*')).alias('value')
)
kafkaDf.printSchema()
kafkaDf.show(truncate=False)
root
 |-- key: integer (nullable = true)
 |-- value: string (nullable = true)

+---+-----------------------------------------------------------+
|key|value                                                      |
+---+-----------------------------------------------------------+
|1  |{"busDate":"2022-09-23","msg":{"id":"1","status":"active"}}|
+---+-----------------------------------------------------------+

然后,您可以像往常一样使用kafkaDf.write.format("kafka")

或者,如果您希望将字符串信息 Package 在单个字段中,而不是键-值对中,那么您的Kafka使用者将需要自己处理这一问题,例如对记录和内部字符串(JSON值)进行双重反序列化。

相关问题