我们需要使用 maxOffsetsPerTrigger
Kafka的资料来源 Trigger.Once()
在结构化流媒体中,但基于这个问题 allAvailable
Spark3。在这种情况下,有没有办法达到利率上限?
以下是spark 3中的示例代码:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
) ++
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
1条答案
按热度按时间sg2wtvxw1#
没有其他方法可以绕过它来适当地设置一个利率限制。如果
maxOffsetsPerTrigger
不适用于具有Once
触发器您可以执行以下操作以获得相同的结果:选择另一个触发器并使用
maxOffsetsPerTrigger
限制速率并在完成处理所有数据后手动终止此作业。使用选项
startingOffsets
以及endingOffsets
同时使作业成为批处理作业。重复此步骤,直到处理完主题中的所有数据。然而,正如这里所详述的,“runonce模式下的流媒体比批处理更好”是有原因的。最后一个选择是查看链接的pull请求并自己编译spark。