在我发布我的问题之前,我要感谢gary和artem帮助我解决了我的问题,并且我能够成功地将jms中的消息发布到kafka,并且事务已经就绪。
现在,我面临着另一个问题,考验着当我的Kafka倒下时会发生什么。当kafka停止前几次重试时,kafka出站适配器抛出异常,消息返回到jms并一次又一次地重试。
但是,经过几次重试后,即使kafka关闭,消息也会从jms中出列,我得到以下异常:
2017-07-10 23:27:51.117 ERROR 16116 --- [enerContainer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='Test JPMC' to topic test:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
我的集成xml是:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<jms:message-driven-channel-adapter
id="helloJMSAdapater" container="requestListenerContainer"
channel="helloChannel" extract-payload="true" error-channel="errorChannel"/>
<integration:recipient-list-router
input-channel="errorChannel">
<integration:recipient channel="errorOutputChannel" />
<integration:recipient channel="rethrowChannel" />
</integration:recipient-list-router>
<jms:outbound-channel-adapter id="errorQueueChannelAdapter"
channel="errorOutputChannel" destination="errorQueue" connection-factory="jmsConnectionfactory"
delivery-persistent="true" explicit-qos-enabled="true" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-template="kafkaTemplate"
auto-startup="true" sync="true" channel="inputToKafka" topic="test">
</int-kafka:outbound-channel-adapter>
</beans>
我不想确认jms消息,除非它们成功地发布到kafka中。
是因为Kafka设置了一些默认参数吗?
我的Kafka配置如下:
@Configuration
@Component
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//this.brokerAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
}
1条答案
按热度按时间f8rj6qna1#
那不是Kafka的问题。如果您说您的消息是“从jms出列”,请确保队列上的重新传递策略配置为无限。
例如,activemq故事如下:http://activemq.apache.org/redelivery-policy.html
maximumredeliveries 6设置消息在被视为毒丸并返回到代理以便进入死信队列之前被重新传递的最大次数。设置为-1表示无限次重新交付。