在使用kafka streams的处理器api时,我使用如下内容:
context.forward(key,value)
context.commit()
实际上,我在这里做的是每分钟将一个状态从状态存储转发到接收器(使用init()方法中的context.schedule())。我不明白的是:
[键,值]我向前发送的对,然后执行commit()从状态存储中获取。它是根据我的特定逻辑从manynot顺序输入[key,value]对聚合的。每个这样的输出[key,value]对都是来自输入的几个未排序的[key,value]对的集合(kafka主题)。所以,我不明白kafka cluster和kafka streams lib如何知道原始输入[key,value]对和最终输出[key,value]之间的相关性。如果kafka不知道输入对和输出对之间的连接,它如何被事务 Package (fail-safe)。当我执行context.commit()时,实际提交的是什么?
谢谢!
1条答案
按热度按时间hvvq6cgz1#
详细解释这一切超出了我在这里的答案。
基本上,如果事务被提交,那么当前的输入主题偏移量和对kafka主题的所有写入都是以原子方式完成的。这意味着,在提交完成之前,所有挂起的写操作都会被刷新。
事务不需要知道您的实际业务逻辑。它们只是将输入主题的进度跟踪与输出主题的写入“同步”。
我建议阅读相应的博客文章,并观看一次关于Kafka的演讲,以了解更多细节:
博客:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
博客:https://www.confluent.io/blog/enabling-exactly-kafka-streams/
谈话:https://www.confluent.io/kafka-summit-nyc17/resource/#exactly-一次性滑轨
谈话:https://www.confluent.io/kafka-summit-sf17/resource/#exactly-一次流处理Kafka流
顺便说一句:这是一个关于streams api中手动提交的问题。您应该考虑以下问题:如何使用kafka流手动提交?