我在spark中使用以下消费代码阅读Kafka主题:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", topicName)
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
代码按预期从主题中读取内容,但主题的内容不会因此而被清除。重复执行会导致同一组消息一次又一次地返回。
我应该怎么做才能使邮件在阅读时从主题中删除?
2条答案
按热度按时间k4aesqcs1#
正如crikcet_所提到的,kafka不会在消耗之后移除原木。您可以使用基于大小的策略或基于时间的设置来管理kafka中的日志保留。
log.retention.bytes
-删除日志之前日志的最大大小log.retention.hours
-删除日志文件之前保留日志文件的小时数log.retention.minutes
-保留日志文件的分钟数log.retention.ms
-保留日志文件的毫秒数您可以在这里阅读有关这些参数的更多信息
除此之外,处理日志保留的其他机制是日志压缩。通过设置以下参数,可以管理日志压缩
你可以在这里了解更多
m3eecexj2#
kafka在使用时不会删除主题消息
您的spark代码是kafka消费组的一部分,它需要确认消息已被读取,并提交这些偏移量,我相信spark在默认情况下会定期执行这些操作,但是您可以通过设置选项
enable.auto.commit
至false
,这是强烈建议的,因为您将希望控制spark是否已成功处理记录集合。检查点或提交到持久存储的偏移量是在任务重新启动/失败时保留偏移量的一些方法,而不是重新读取相同的数据