我使用以下代码使用来自jms activemq的消息:
<jms:message-driven-channel-adapter
id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory"
channel="helloChannel" extract-payload="true" />
<integration:channel id="helloChannel" />
我的要求是从这里消费并将其发布到kafka出站适配器。使用以下配置:
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
</int-kafka:outbound-channel-adapter>
以下是我想要达到的目标:
我的队列是一个持久的主题,除非成功发布到kafka,否则我不想确认记录。简言之,我希望有一个事务行为,从使用来自jms的消息到将其发布到kafka。
我注意到我的消息会立即退出队列,如果处理过程遇到异常,我将无法重新处理它。我不想那样。
另外,当Kafka遇到一些问题时,我希望它被返回到某个方法,这样我就可以持久化失败消息,正如前面所说的,我不想承认它。
我真的很难让它发挥作用。有人能帮帮我吗?
1条答案
按热度按时间ddarikpa1#
你真的可以
transaction-manager
上<jms:message-driven-channel-adapter>
启动tx。什么时候
<int-kafka:outbound-channel-adapter>
抛出一个异常,它会导致tx被回调,因此消息将重新发出。如果您对持续存在的错误感兴趣,那么
error-channel
上的选项<jms:message-driven-channel-adapter>
,但您仍然必须重新抛出异常才能让tx回调。为了使所有这些工作,你应该确保只有一个线程从乞讨结束。不
<queue>
或者executor
水流中的通道。另外,也不清楚为什么你还用这么老的ApacheKafka。。。