我有以下dataframe(finaldataframe)模式
root
|-- sentence: string (nullable = true)
|-- category: string (nullable = true)
|-- Id: string (nullable = true)
我定义了以下模式
def defineS3SinkSchema() : StructType = {
new StructType()
.add("payload", new StructType()
.add("sentence", StringType)
.add("Id", LongType)
.add("category", StringType)
)
}
我想使用上面的模式对上面定义的Dataframe进行修改,并写入一个Kafka主题。但我不知道如何将已定义的模式与Dataframe集成。下面是写Kafka的代码主题。
val jsonFormatData = finalDataFrame.select(col("key").cast("string").alias("key"),
to_json(struct(
col("sentence"),
col("category"),
col("key").as("Id")
)).alias("value"))
jsonFormatData.printSchema()
val writeStream = jsonFormatData
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "myTopic.val")
.option("checkpointLocation", "test_path")
.start()
writeStream.awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!