当spark消费者读到Kafka主题时,它不会被冲淡

jpfvwuh4  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(286)

我在spark中使用以下消费代码阅读Kafka主题:

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", topicName)
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

代码按预期从主题中读取内容,但主题的内容不会因此而被清除。重复执行会导致同一组消息一次又一次地返回。
我应该怎么做才能使邮件在阅读时从主题中删除?

k4aesqcs

k4aesqcs1#

正如crikcet_所提到的,kafka不会在消耗之后移除原木。您可以使用基于大小的策略或基于时间的设置来管理kafka中的日志保留。 log.retention.bytes -删除日志之前日志的最大大小 log.retention.hours -删除日志文件之前保留日志文件的小时数 log.retention.minutes -保留日志文件的分钟数 log.retention.ms -保留日志文件的毫秒数
您可以在这里阅读有关这些参数的更多信息
除此之外,处理日志保留的其他机制是日志压缩。通过设置以下参数,可以管理日志压缩

log.cleanup.policy

log.cleaner.min.compaction.lag.ms

你可以在这里了解更多

m3eecexj

m3eecexj2#

kafka在使用时不会删除主题消息
您的spark代码是kafka消费组的一部分,它需要确认消息已被读取,并提交这些偏移量,我相信spark在默认情况下会定期执行这些操作,但是您可以通过设置选项 enable.auto.commitfalse ,这是强烈建议的,因为您将希望控制spark是否已成功处理记录集合。
检查点或提交到持久存储的偏移量是在任务重新启动/失败时保留偏移量的一些方法,而不是重新读取相同的数据

相关问题