我有一个应用程序,它从Kafka获取数据并将其保存到数据库。我的代码如下所示:
spark.readStream
.format("kafka")
.options(options)
.load()
.writeStream
.trigger(Trigger.ProcessingTime(20000))
.foreachBatch({ (batch: DataFrame, _: Long) =>
val rowsCount = batch.count
saveBatch(batch)
println(s"Saved $rowsCount rows")
})
.start()
字符串
在Spark UI中,我查看Structure Streaming选项卡,看到我的流的处理速率是每秒10万行。如果我像这样删除行数:
.foreachBatch({ (batch: DataFrame, _: Long) =>
saveBatch(batch)
})
.start()
型
如你所见,在第一种情况下,我没有使用缓存,当我计算批处理中的行数并将批处理保存到数据库时,我可能从Kafka读取数据两次(而不是第二种情况,当阅读只有一次)我不能相信,这样的琐碎操作,如行计数导致额外的阅读从Kafka,它是任何方法,在不缓存数据的情况下获取批处理长度?
1条答案
按热度按时间ttisahbt1#
@OneCricketeer你说的对,Spark不会多次读取Kafka。我查了DAG方案,两种情况下都只有一个“MicroBatchScan”阶段。另外我比较了Kafka主机上的网络利用率,也没有什么区别(输出是相同的)。看起来结构流选项卡中的“处理率”意味着处理的行的精确计数,在批处理期间,但这并不影响阅读Kafka