我如何处理 Spring 整合中未能产生给Kafka的信息?
我没有看到“error channel”是“int”的一个选项-kafka:outbound-channel-adapter,不知道应该在哪里添加错误通道信息,以便我的errorhandler可以获得“failed to produce to kafka”类型的错误(包括所有类型的故障、配置、网络等)
另外,inputokafka是队列通道,我应该在哪里添加错误通道来处理潜在的队列满错误?
<int:gateway id="myGateway"
service-interface="someGateway"
default-request-channel="transformChannel"
error-channel="errorChannel"
default-reply-channel="replyChannel"
async-executor="MyThreadPoolTaskExecutor"/>
<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka">
<bean class="Transformer"/>
</int:transformer>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
message-key-expression="'bar'"
partition-id-expression="2">
<int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0"
task-executor="kafkaExecutor"/>
</int-kafka:outbound-channel-adapter>
<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
....
</bean>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'>
<bean class="ErrorHandler"/>
</int:service-activator>
编辑
<property name="producerListener">
<bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/>
</property>
1条答案
按热度按时间z9ju0rcb1#
下游流量上的任何错误都将发送到
error-channel
在你的网关上。但是,由于kafka在默认情况下是异步的,因此不会出现任何错误。你可以设置sync=true
在出站适配器上,然后如果出现问题,将引发异常。不过,请记住,这会慢得多。
您可以通过添加
ProducerListener
给你的KafkaTemplate
.