spark 3结构化流媒体使用kafka source中的maxoffsetspertrigger和trigger.once

nwnhqdif  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(1023)

我们需要使用 maxOffsetsPerTrigger Kafka的资料来源 Trigger.Once() 在结构化流媒体中,但基于这个问题 allAvailable Spark3。在这种情况下,有没有办法达到利率上限?
以下是spark 3中的示例代码:

def options: Map[String, String] = Map(
  "kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
  "subscribe" -> conf.getString("topic")
) ++
  Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
  .load
  .writeStream
  .trigger(Trigger.Once)
  .start()
sg2wtvxw

sg2wtvxw1#

没有其他方法可以绕过它来适当地设置一个利率限制。如果 maxOffsetsPerTrigger 不适用于具有 Once 触发器您可以执行以下操作以获得相同的结果:
选择另一个触发器并使用 maxOffsetsPerTrigger 限制速率并在完成处理所有数据后手动终止此作业。
使用选项 startingOffsets 以及 endingOffsets 同时使作业成为批处理作业。重复此步骤,直到处理完主题中的所有数据。然而,正如这里所详述的,“runonce模式下的流媒体比批处理更好”是有原因的。
最后一个选择是查看链接的pull请求并自己编译spark。

相关问题