使用kafkamessagedriven通道适配器时出错

mwg9r5ms  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(384)

我在用Kafka和Kafka交谈。
下面是我为使用messagedrivenchannel适配器所做的配置。

<!-- kafka MessageDriven Channel Adapter for ProcessEvent -->
    <int-kafka:message-driven-channel-adapter
        listener-container="listnerContainer" payload-decoder="kafkaReflectionDecoder"
        key-decoder="kafkaReflectionDecoder" channel="storeOffsetsChannel"
        auto-startup="true"/>

    <bean id="zkConfiguration"
        class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
        <constructor-arg ref="zookeeperConnect"></constructor-arg>
    </bean>
    <bean id="kafkaConnectionFactory"
        class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
        <constructor-arg ref="zkConfiguration"></constructor-arg>
    </bean>
    <bean id="listnerContainer"
        class="org.springframework.integration.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg ref="kafkaConnectionFactory"></constructor-arg>
        <constructor-arg value="${listed.accounts.topic}"></constructor-arg>
    </bean>

<!-- Zookeeper connect needed for Kafka Consumer -->
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="${app.zookeeper.servers}" zk-connection-timeout="6000"
        zk-session-timeout="6000" zk-sync-time="2000" />

当我开始我的应用程序时,有时应用程序不会以以下错误开始。但有时应用程序启动良好

2015-09-03 11:53:32.647 ERROR 28883 --- [           main] o.s.boot.SpringApplication               : Application startup failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter#0'; nested exception is kafka.common.KafkaException: fetching topic metadata for topics [Set(fulfillment.payments.autopay.listeddueaccounts)] from broker [List()] failed
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:770)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:140)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)
    at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:118)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:686)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:320)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:957)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:946)
    at com.capitalone.payments.autopay.autopayprocesstransrecon.AutopayProcessTransactionRecon.main(AutopayProcessTransactionRecon.java:26)
Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(fulfillment.payments.autopay.listeddueaccounts)] from broker [List()] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at org.springframework.integration.kafka.core.DefaultConnectionFactory.refreshMetadata(DefaultConnectionFactory.java:178)
    at org.springframework.integration.kafka.core.DefaultConnectionFactory.getPartitions(DefaultConnectionFactory.java:221)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer$GetPartitionsForTopic.safeValueOf(KafkaMessageListenerContainer.java:611)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer$GetPartitionsForTopic.safeValueOf(KafkaMessageListenerContainer.java:600)
    at com.gs.collections.impl.block.function.checked.CheckedFunction.valueOf(CheckedFunction.java:30)
    at com.gs.collections.impl.utility.ArrayIterate.flatCollect(ArrayIterate.java:933)
    at com.gs.collections.impl.utility.ArrayIterate.flatCollect(ArrayIterate.java:919)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer.getPartitionsForTopics(KafkaMessageListenerContainer.java:332)
    at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer.start(KafkaMessageListenerContainer.java:294)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.doStart(KafkaMessageDrivenChannelAdapter.java:137)
    at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:94)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
    ... 13 common frames omitted

我的消息驱动适配器配置是否错误?
另外,我还有int-kafka:inbound-channel-adapter configured 在我的应用程序中,该适配器的配置如下:

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter" channel="domainEventChannel"
    kafka-consumer-context-ref="consumerContext" auto-startup="true">
    <int:poller fixed-delay="10" time-unit="MILLISECONDS"></int:poller>
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContext"
    zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"
    consumer-timeout="1000">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id="autopayProcessTransactionRecon" value-decoder="kafkaReflectionDecoder"
            key-decoder="kafkaReflectionDecoder" max-messages="1">
            <int-kafka:topic streams="2" id="${domain.event.topic}" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<!-- Zookeeper connect needed for Kafka Consumer -->
<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="${app.zookeeper.servers}" zk-connection-timeout="6000"
    zk-session-timeout="6000" zk-sync-time="2000" />
uplii1fm

uplii1fm1#

基于
代理[list()]
作为跟踪的一部分,您似乎没有配置主机/代理ip。您可以考虑使用不同的配置实现作为kafkanconnectionfactory的构造函数参数,例如。

<bean id="brokerConfiguration" class="org.springframework.integration.kafka.core.BrokerAddressListConfiguration">
            <constructor-arg>
                <bean class="org.springframework.integration.kafka.core.BrokerAddress">
                    <constructor-arg type="java.lang.String" value="localhost"/>
                </bean>
            </constructor-arg>
        </bean>
        <bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
            <constructor-arg ref="brokerConfiguration"></constructor-arg>
        </bean>

相关问题