spring集成kafka出站适配器错误句柄

ffdz8vbo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我如何处理 Spring 整合中未能产生给Kafka的信息?
我没有看到“error channel”是“int”的一个选项-kafka:outbound-channel-adapter,不知道应该在哪里添加错误通道信息,以便我的errorhandler可以获得“failed to produce to kafka”类型的错误(包括所有类型的故障、配置、网络等)
另外,inputokafka是队列通道,我应该在哪里添加错误通道来处理潜在的队列满错误?

<int:gateway id="myGateway" 
            service-interface="someGateway" 
            default-request-channel="transformChannel" 
            error-channel="errorChannel"  
            default-reply-channel="replyChannel" 
            async-executor="MyThreadPoolTaskExecutor"/>

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka">
    <bean class="Transformer"/>  
</int:transformer>

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    message-key-expression="'bar'"
                                    partition-id-expression="2">
    <int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0"
                    task-executor="kafkaExecutor"/>
</int-kafka:outbound-channel-adapter>

<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    ....
</bean>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'>
    <bean class="ErrorHandler"/>
</int:service-activator>

编辑

<property name="producerListener">
    <bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/>
</property>
z9ju0rcb

z9ju0rcb1#

下游流量上的任何错误都将发送到 error-channel 在你的网关上。但是,由于kafka在默认情况下是异步的,因此不会出现任何错误。你可以设置 sync=true 在出站适配器上,然后如果出现问题,将引发异常。
不过,请记住,这会慢得多。
您可以通过添加 ProducerListener 给你的 KafkaTemplate .

相关问题