根据dataframe中的条件向kafka主题发送数据

qoefvg9y  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(359)

我想根据sparkstreaming中数据的值更改kafka主题目的地以保存数据。有可能再来一次吗?当我尝试下面的代码时,它只执行第一个,而不执行下面的进程。

(testdf 
.filter(f.col("value") == "A")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_1")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "test")
.option("startingOffsets", "latest")
.start()
      )

(testdf 
.filter(f.col("value") == "B")
.selectExpr("CAST(value as STRING) as value")
.writeStream
.format("kafka")
.option("checkpointLocation", "/checkpoint_2")
.option("kafka.bootstrap.servers","~~:9092")
.option("topic", "testB")
.option("startingOffsets", "latest")
.start()
      )

数据存储在主题名测试中。有人能想出一个办法吗?
我更改了目的地以保存这样的Dataframe。

|type|value|
| A  |testvalue|
| B  |testvalue|

a型主题测试。键入b到主题testb。

fcwjkofz

fcwjkofz1#

使用spark的最新版本,您可以创建一个列 topic 在Dataframe中,用于将记录定向到相应的主题。
对你来说,这意味着你可以做

testdf 
  .withColumn("topic", when(f.col("value") == "A", lit("test")).otherwise(lit("testB"))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream .format("kafka") 
  .option("checkpointLocation", "/checkpoint_1") 
  .option("kafka.bootstrap.servers","~~:9092")
  .start()
qf9go6mv

qf9go6mv2#

谢谢迈克。我可以通过运行以下代码来实现这一点!

(
testdf 
  .withColumn("topic",f.when(f.col("testTime") == "A", f.lit("test")).otherwise(("testB")))
  .selectExpr("CAST(value as STRING) as value", "topic") 
  .writeStream
  .format("kafka") 
  .option("checkpointLocation", "/checkpoint_2") 
  .option("startingOffsets", "latest")
  .option("kafka.bootstrap.servers","9092")
  .start()
)

相关问题