KafkaOffsetAutoFrangeException

qxgroojn  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(175)

我通过Kafka传输大量数据。然后,我有Spark流,这是消费这些信息。基本上,spark streaming抛出了以下错误:

kafka.common.OffsetOutOfRangeException

现在我知道这个错误意味着什么。所以我把保留期改为5天。但是我还是遇到了同样的问题。然后,我列出了一个主题的所有消息,从kafka开始。毫无疑问,Kafka流媒体部分开始时的大量消息都不存在,而且由于spark流媒体稍微落后于Kafka流媒体部分,spark流媒体尝试使用已被Kafka删除的消息。不过,我认为更改保留策略可以解决以下问题:

--add-config retention.ms=....

我怀疑的是,Kafka正在删除主题中的消息,以便为新消息腾出空间(因为我们正在传输大量数据)。我是否可以配置一个属性来指定在删除之前的消息之前kafka可以存储多少字节的数据?

mklgxw1f

mklgxw1f1#

使用“主题配置”属性创建主题时,可以设置主题的最大大小 retention.bytes 通过控制台,如:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1  --replication-factor 1 --config retention.bytes=10485760 --config

或者可以使用全局代理配置属性 log.retention.bytes 设置所有主题的最大大小。
重要的是要知道 log.retention.bytes 不强制对主题大小进行硬限制,但它只是向Kafka发出信号,告诉他何时开始删除最旧的消息

nc1teljy

nc1teljy2#

解决此问题的另一种方法是在配置中指定spark参数:

spark.streaming.kafka.maxRatePerPartition

相关问题