如何仅在至少有n行时执行流处理?

voj3qocg  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(457)

我在kafka使用者处有以下spark sql/streaming查询,当批大小达到特定的大小n时,我如何指定fetch应该是有条件的,否则使用者应该在处理之前缓冲元素,所以每当我想执行我的逻辑时,就保证我有一个精确的 Dataset<VideoEventData> 大小为n。当前代码:

Dataset<VideoEventData> ds = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
      .option("subscribe", prop.getProperty("kafka.topic"))
      .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
      .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
      .load()
      .selectExpr("CAST(value AS STRING) as message")
      .select(functions.from_json(functions.col("message"),schema).as("json"))
      .select("json.*")
      .as(Encoders.bean(VideoEventData.class));
fcwjkofz

fcwjkofz1#

我想执行我的逻辑,保证我有一个大小为n的精确数据集
这在spark结构化流媒体(和spark一般)中是不可能的。
您有以下选项:
使用kafka使用者属性配置位于kafka源后面的kafka使用者。
作为任意有状态聚合的一部分,自己缓冲行。
编写一个自定义源来处理缓冲本身。
为2。我可以使用keyvaluegroupeddataset.flatmapgroupswithstate和一个状态,该状态将在“块”上累积,最终得到大小n。
三个月。实现一个定制的有状态流媒体源 getOffset 以及 getBatch 在某种程度上 getOffset 只有在至少 N 排。
免责声明:我自己从来没有做过这两种解决方案,但它们看起来是可行的。

cgh8pdjw

cgh8pdjw2#

您可以通过配置kafka消费者本身来实现这一点。套 fetch.min.bytes 你想要的最小值。这将告诉Kafka等到它有足够的数据。
有一个相关的设置, fetch.max.wait.ms ,它控制着Kafka最多要等多久。默认情况下,该值为500 ms。你可以在这里读更多。

相关问题