我在用Kafka和Kafka交谈。
下面是我为使用messagedrivenchannel适配器所做的配置。
<!-- kafka MessageDriven Channel Adapter for ProcessEvent -->
<int-kafka:message-driven-channel-adapter
listener-container="listnerContainer" payload-decoder="kafkaReflectionDecoder"
key-decoder="kafkaReflectionDecoder" channel="storeOffsetsChannel"
auto-startup="true"/>
<bean id="zkConfiguration"
class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect"></constructor-arg>
</bean>
<bean id="kafkaConnectionFactory"
class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
<constructor-arg ref="zkConfiguration"></constructor-arg>
</bean>
<bean id="listnerContainer"
class="org.springframework.integration.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg ref="kafkaConnectionFactory"></constructor-arg>
<constructor-arg value="${listed.accounts.topic}"></constructor-arg>
</bean>
<!-- Zookeeper connect needed for Kafka Consumer -->
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="${app.zookeeper.servers}" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
当我开始我的应用程序时,有时应用程序不会以以下错误开始。但有时应用程序启动良好
2015-09-03 11:53:32.647 ERROR 28883 --- [ main] o.s.boot.SpringApplication : Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter#0'; nested exception is kafka.common.KafkaException: fetching topic metadata for topics [Set(fulfillment.payments.autopay.listeddueaccounts)] from broker [List()] failed
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:770)
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:140)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:118)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:686)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:320)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:957)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:946)
at com.capitalone.payments.autopay.autopayprocesstransrecon.AutopayProcessTransactionRecon.main(AutopayProcessTransactionRecon.java:26)
Caused by: kafka.common.KafkaException: fetching topic metadata for topics [Set(fulfillment.payments.autopay.listeddueaccounts)] from broker [List()] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at org.springframework.integration.kafka.core.DefaultConnectionFactory.refreshMetadata(DefaultConnectionFactory.java:178)
at org.springframework.integration.kafka.core.DefaultConnectionFactory.getPartitions(DefaultConnectionFactory.java:221)
at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer$GetPartitionsForTopic.safeValueOf(KafkaMessageListenerContainer.java:611)
at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer$GetPartitionsForTopic.safeValueOf(KafkaMessageListenerContainer.java:600)
at com.gs.collections.impl.block.function.checked.CheckedFunction.valueOf(CheckedFunction.java:30)
at com.gs.collections.impl.utility.ArrayIterate.flatCollect(ArrayIterate.java:933)
at com.gs.collections.impl.utility.ArrayIterate.flatCollect(ArrayIterate.java:919)
at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer.getPartitionsForTopics(KafkaMessageListenerContainer.java:332)
at org.springframework.integration.kafka.listener.KafkaMessageListenerContainer.start(KafkaMessageListenerContainer.java:294)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.doStart(KafkaMessageDrivenChannelAdapter.java:137)
at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:94)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
... 13 common frames omitted
我的消息驱动适配器配置是否错误?
另外,我还有int-kafka:inbound-channel-adapter configured 在我的应用程序中,该适配器的配置如下:
<int-kafka:inbound-channel-adapter
id="kafkaInboundChannelAdapter" channel="domainEventChannel"
kafka-consumer-context-ref="consumerContext" auto-startup="true">
<int:poller fixed-delay="10" time-unit="MILLISECONDS"></int:poller>
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContext"
zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"
consumer-timeout="1000">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="autopayProcessTransactionRecon" value-decoder="kafkaReflectionDecoder"
key-decoder="kafkaReflectionDecoder" max-messages="1">
<int-kafka:topic streams="2" id="${domain.event.topic}" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<!-- Zookeeper connect needed for Kafka Consumer -->
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="${app.zookeeper.servers}" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
1条答案
按热度按时间uplii1fm1#
基于
代理[list()]
作为跟踪的一部分,您似乎没有配置主机/代理ip。您可以考虑使用不同的配置实现作为kafkanconnectionfactory的构造函数参数,例如。