我想使用kafka streams处理器api,在预定的标点符号或函数中每分钟生成一些消息。kafka流能保证这些消息只被写入输出主题一次吗?
我知道在kafka streams中可以只进行一次处理,因为它通过以下操作生成单个事务:
提交输入主题的偏移量
将结果写入输出主题
这个概念是否扩展到了处理器api中的标点符号或函数,对于这些标点符号或函数,没有需要提交的关联输入消息?
例如,这个标点符号函数迭代键值状态存储中的项。从存储中删除每个项目并转发到下游:
override def punctuate(timestamp: Long) : Unit =
store.all.asScala.foreach { keyValue =>
store.delete(keyValue.key)
context.forward(keyValue.key, keyValue.value)
}
存储区中的每条消息都应该在输出主题上出现一次,即使是在处理器出现故障并重新启动的情况下。
假设存储是持久的;它有一个Kafka变更日志主题的支持。标点器被安排在每分钟的挂钟时间。我已配置 processing.guarantee=exactly_once
在我的配置中。
1条答案
按热度按时间gk7wooem1#
如果你使用标点符号,语义也同样适用。
在引擎盖下,使用状态存储将要写入changelog主题(甚至删除-使用一些键和
null
值)在您的用例中,kafka流将从某个输入主题读取消息,并写入输出主题和某个changelog主题(状态存储上的操作)。
如果在kafka流中只启用一次,它将在事务模式下工作。使用事务-原子多分区写入-kafka流确保,当执行偏移提交时,结果被写入输出主题,状态存储也被刷新到代理上的changelog主题。上述操作是原子操作,所以若其中一个操作失败,应用程序将重新处理来自上一个偏移位置的消息。所有这些都会起作用,因为
Processor::process
以及Punctuator::punctuate(...)
在特定分区的单个线程中执行。更多详情请参见:
马蒂亚斯j。来自Kafka峰会的sax演示:https://kafka-summit.org/sessions/dont-repeat-introducing-exactly-semantics-apache-kafka
Realm 璋在合流网页上发表博文:https://www.confluent.io/blog/enabling-exactly-kafka-streams -有以下部分:
How Kafka Streams Guarantees Exactly-Once Processing
.