在scala中将Dataframe的多列写入kafka

8iwquhpp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(386)

在这个问题的基础上,如何将Dataframe的所有列写入Kafka主题。
目前我有一个带有一些列的Dataframe,我应该用一个键将其写入kafka,因此我从旧的Dataframe创建一个新的Dataframe,并指定键和值:

val endDf: DataFrame = midDf.withColumn("key",lit(keyval)).withColumn("value",lit(testVal))

现在,当我写这封信给Kafka时,我特别指出:

endDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "test:8808")
  .option("topic", "topic1")
  .save()

如果值是单列,则此操作有效。但是初始Dataframe是由多个列组成的,我需要用json格式编写所有这些列。
我该如何将所有列作为 value . 我觉得它是围绕着使用 interDf.columns 以及 to_json

vyu0f0g1

vyu0f0g11#

Kafka期待一把钥匙和一个价值观;因此,必须使用 to_json() :

import org.apache.spark.sql.functions._

val value_col_names = endDf.columns.filter(_ != "yourKeyColumn") 

endDf.withColumnRenamed("yourKeyColumn", "key") \ 
     .withColumn("value", to_json(struct(value_col_names.map(col(_)):_*))) \
     .select("key", "value") \ 
     .write() \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", "test:8808") \ 
     .option("topic", "topic1") \ 
     .save()

相关问题