在使用scala spark写入kafka主题之前,为Dataframe定义架构

mzaanser  于 2023-02-15  发布在  Spark
关注(0)|答案(0)|浏览(232)

我有以下dataframe(finaldataframe)模式

  1. root
  2. |-- sentence: string (nullable = true)
  3. |-- category: string (nullable = true)
  4. |-- Id: string (nullable = true)

我定义了以下模式

  1. def defineS3SinkSchema() : StructType = {
  2. new StructType()
  3. .add("payload", new StructType()
  4. .add("sentence", StringType)
  5. .add("Id", LongType)
  6. .add("category", StringType)
  7. )
  8. }

我想使用上面的模式对上面定义的Dataframe进行修改,并写入一个Kafka主题。但我不知道如何将已定义的模式与Dataframe集成。下面是写Kafka的代码主题。

  1. val jsonFormatData = finalDataFrame.select(col("key").cast("string").alias("key"),
  2. to_json(struct(
  3. col("sentence"),
  4. col("category"),
  5. col("key").as("Id")
  6. )).alias("value"))
  7. jsonFormatData.printSchema()
  8. val writeStream = jsonFormatData
  9. .writeStream
  10. .format("kafka")
  11. .option("kafka.bootstrap.servers", hostAddress)
  12. .option("topic", "myTopic.val")
  13. .option("checkpointLocation", "test_path")
  14. .start()
  15. writeStream.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题