我们有一些需求,在Kafka事务提交后,我们需要做一些资源清理。这是java spring应用程序,我们使用spring云流kafka库来实现kafka事务。我想知道,在事务提交后,是否有任何来自kafka事务协调器的钩子或回调,在那里应用程序可以运行一些逻辑?目前我们依赖于ProduceListener onSuccess方法来清理资源。但是最近我们意识到它是从代理回调的(从代理确认),而不是在事务提交之后。
b1payxdu1#
我没有看到任何这样的钩子,我检查了spring Kafka事务管理器的整个源代码,没有发现任何钩子,而且,来自代理的ack是记录正确提交的可靠性保证,因为它可能发生在您生成的生产者端,但在数据发送到代理之前进程崩溃,您释放了资源。我不完全确定您的流程是什么,但如果您希望在发送数据后释放资源,您可以这样做,因为生产者也有重试策略,它将重试发送消息,否则您当前所做的就足够了
1条答案
按热度按时间b1payxdu1#
我没有看到任何这样的钩子,我检查了spring Kafka事务管理器的整个源代码,没有发现任何钩子,而且,来自代理的ack是记录正确提交的可靠性保证,因为它可能发生在您生成的生产者端,但在数据发送到代理之前进程崩溃,您释放了资源。
我不完全确定您的流程是什么,但如果您希望在发送数据后释放资源,您可以这样做,因为生产者也有重试策略,它将重试发送消息,否则您当前所做的就足够了