我正在使用spark结构化流媒体和Kafka主题的阅读。目标是将消息写入多个表的postgresql数据库。
消息架构为:
root
|-- id: string (nullable = true)
|-- name: timestamp (nullable = true)
|-- comment: string (nullable = true)
|-- map_key_value: map (nullable = true)
|-- key: string
|-- value: string (valueContainsNull = true)
在删除map\u key\u值后写入一个表时,使用以下代码:
我的代码是:
message.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format("jdbc").option("url", "url")
.option("user", "username")
.option("password", "password")
.option(JDBCOptions.JDBC_TABLE_NAME, "table_1')
.mode(SaveMode.Append).save();
}.outputMode(OutputMode.Append()).start().awaitTermination()
我想将消息写入表1(id、name、comment)和表2的两个db表中,表2需要有map\u key\u值。
1条答案
按热度按时间z0qdvdin1#
您将需要n个流查询n个接收器;t1和t2都算作一个单独的接收器。
writestream当前不写入jdbc,因此应该使用foreachbatch运算符。