在这个问题的基础上,如何将Dataframe的所有列写入Kafka主题。
目前我有一个带有一些列的Dataframe,我应该用一个键将其写入kafka,因此我从旧的Dataframe创建一个新的Dataframe,并指定键和值:
val endDf: DataFrame = midDf.withColumn("key",lit(keyval)).withColumn("value",lit(testVal))
现在,当我写这封信给Kafka时,我特别指出:
endDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "test:8808")
.option("topic", "topic1")
.save()
如果值是单列,则此操作有效。但是初始Dataframe是由多个列组成的,我需要用json格式编写所有这些列。
我该如何将所有列作为 value
. 我觉得它是围绕着使用 interDf.columns
以及 to_json
1条答案
按热度按时间vyu0f0g11#
Kafka期待一把钥匙和一个价值观;因此,必须使用
to_json()
: