如何在pyspark结构化流媒体中使用maxoffsetspertrigger?

8cdiaqws  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(416)

我想限制从Kafka获取数据时的速率。我的代码看起来像:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

但是当我打电话的时候 df.count() ,结果是600。我期望的是20岁。有人知道为什么“maxoffsetspertrigger”不起作用吗。

nbysray5

nbysray51#

您将为每个分区(0、1、2)带来200条记录,总共是600条记录。
如您所见:
使用maxoffsetspertrigger选项限制每个触发器要获取的记录数。
这意味着,对于每个触发器或获取进程,kafka将获得20条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区200条)。

相关问题