jms到kafka消息传输-端到端事务

gywdnpxw  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(415)

我使用以下代码使用来自jms activemq的消息:

<jms:message-driven-channel-adapter
        id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory"
        channel="helloChannel" extract-payload="true" />
<integration:channel id="helloChannel" />

我的要求是从这里消费并将其发布到kafka出站适配器。使用以下配置:

<int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
</int-kafka:outbound-channel-adapter>

以下是我想要达到的目标:
我的队列是一个持久的主题,除非成功发布到kafka,否则我不想确认记录。简言之,我希望有一个事务行为,从使用来自jms的消息到将其发布到kafka。
我注意到我的消息会立即退出队列,如果处理过程遇到异常,我将无法重新处理它。我不想那样。
另外,当Kafka遇到一些问题时,我希望它被返回到某个方法,这样我就可以持久化失败消息,正如前面所说的,我不想承认它。
我真的很难让它发挥作用。有人能帮帮我吗?

ddarikpa

ddarikpa1#

你真的可以 transaction-manager<jms:message-driven-channel-adapter> 启动tx。
什么时候 <int-kafka:outbound-channel-adapter> 抛出一个异常,它会导致tx被回调,因此消息将重新发出。
如果您对持续存在的错误感兴趣,那么 error-channel 上的选项 <jms:message-driven-channel-adapter> ,但您仍然必须重新抛出异常才能让tx回调。
为了使所有这些工作,你应该确保只有一个线程从乞讨结束。不 <queue> 或者 executor 水流中的通道。
另外,也不清楚为什么你还用这么老的ApacheKafka。。。

相关问题