我想根据sparkstreaming中数据的值更改kafka主题目的地以保存数据。有可能再来一次吗?当我尝试下面的代码时,它只执行第一个,而不执行下面的进程。
(testdf
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
)
(testdf
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
)
数据存储在主题名测试中。有人能想出一个办法吗?
我更改了目的地以保存这样的Dataframe。
|type|value|
| A |testvalue|
| B |testvalue|
a型主题测试。键入b到主题testb。
2条答案
按热度按时间fcwjkofz1#
使用spark的最新版本,您可以创建一个列
topic
在Dataframe中,用于将记录定向到相应的主题。对你来说,这意味着你可以做
qf9go6mv2#
谢谢迈克。我可以通过运行以下代码来实现这一点!