使用Spark Streaming从Kafka高效阅读

moiiocjp  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(232)

我有一个应用程序,它从Kafka获取数据并将其保存到数据库。我的代码如下所示:

spark.readStream
  .format("kafka")
  .options(options)
  .load()
  .writeStream
  .trigger(Trigger.ProcessingTime(20000))
  .foreachBatch({ (batch: DataFrame, _: Long) =>
    val rowsCount = batch.count
    saveBatch(batch)
    println(s"Saved $rowsCount rows")
  })
  .start()

字符串
在Spark UI中,我查看Structure Streaming选项卡,看到我的流的处理速率是每秒10万行。如果我像这样删除行数:

.foreachBatch({ (batch: DataFrame, _: Long) =>
    saveBatch(batch)
  })
  .start()


如你所见,在第一种情况下,我没有使用缓存,当我计算批处理中的行数并将批处理保存到数据库时,我可能从Kafka读取数据两次(而不是第二种情况,当阅读只有一次)我不能相信,这样的琐碎操作,如行计数导致额外的阅读从Kafka,它是任何方法,在不缓存数据的情况下获取批处理长度?

ttisahbt

ttisahbt1#

@OneCricketeer你说的对,Spark不会多次读取Kafka。我查了DAG方案,两种情况下都只有一个“MicroBatchScan”阶段。另外我比较了Kafka主机上的网络利用率,也没有什么区别(输出是相同的)。看起来结构流选项卡中的“处理率”意味着处理的行的精确计数,在批处理期间,但这并不影响阅读Kafka

相关问题