我正在尝试从kafka接收消息并用java方法处理它们。我附上的文件,可以帮助您了解和定位的问题。问题是我得到了错误:org.springframework.beans.factory.beancreationexception:创建名为“org.springframework.integration.config.consumerdpointfactorybean”的bean时出错,如下所示。
网站.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
<display-name>cosmob</display-name>
<welcome-file-list>
<welcome-file>index.html</welcome-file>
<welcome-file>index.htm</welcome-file>
<welcome-file>index.jsp</welcome-file>
<welcome-file>default.html</welcome-file>
<welcome-file>default.htm</welcome-file>
<welcome-file>default.jsp</welcome-file>
</welcome-file-list>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/beans.xml</param-value>
</context-param>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/intergration-cfg-app.xml</param-value>
</context-param>
<servlet>
<servlet-name>CXFServlet</servlet-name>
<servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>CXFServlet</servlet-name>
<url-pattern>/rest/*</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>WsServlet</servlet-name>
<servlet-class>org.cosmob.ccc.web.service.WsServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>WsServlet</servlet-name>
<url-pattern>/ws/positions</url-pattern>
</servlet-mapping>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
</web-app>
integration-cfg-app.xml文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka">
<int:queue/>
</int:channel>
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="false"
channel="inputFromKafka">
<int:poller default="true" fixed-delay="1000" time-unit="MILLISECONDS" max-messages-per-poll="5"/>
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="1000"
zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default"
max-messages="5000">
<int-kafka:topic id="test2" streams="2"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="172.16.201.173:2181" zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000" />
<int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumer" method="processMessage" />
<bean id="kafkaConsumer" class="org.cosmob.kafka.KafkaConsumer"/>
</beans>
Kafka消费者.java
package org.cosmob.kafka;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
public class KafkaConsumer {
public KafkaConsumer() {
// TODO Auto-generated constructor stub
}
public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs
.entrySet()) {
System.out.println("Topic:" + entry.getKey());
ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry
.getValue();
System.out.println("\n****Partition: \n");
Set<Integer> keys = messages.keySet();
for (Integer i : keys)
System.out.println("p:"+i);
System.out.println("\n**************\n");
Collection<List<byte[]>> values = messages.values();
for (Iterator<List<byte[]>> iterator = values.iterator(); iterator
.hasNext();) {
List<byte[]> list = iterator.next();
for (byte[] object : list) {
String message = new String(object);
System.out.println("Message: " + message);
}
}
}
}
}
启动tomcat时出错
13:59:34.401 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.handler.MethodInvokingMessageHandler#0'
13:59:34.404 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Invoking afterPropertiesSet() on bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:59:34.404 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'inputFromKafka'
13:59:35.364 [localhost-startStop-1] WARN o.s.w.c.s.XmlWebApplicationContext - Exception encountered during context initialization - cancelling refresh attempt
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context.
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1578) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:305) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:301) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:753) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:834) ~[spring-context-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:446) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:328) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4994) [catalina.jar:7.0.57]
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5492) [catalina.jar:7.0.57]
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [catalina.jar:7.0.57]
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1575) [catalina.jar:7.0.57]
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1565) [catalina.jar:7.0.57]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_31]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_31]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_31]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_31]
Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context.
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:260) ~[spring-integration-core-4.1.6.RELEASE.jar:na]
at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:211) ~[spring-integration-core-4.1.6.RELEASE.jar:na]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
... 21 common frames omitted
13:59:35.365 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@76967156: defining beans [channelInitializer,$autoCreateChannelCandidates,IntegrationConfigurationBeanFactoryPostProcessor,integrationEvaluationContext,org.springframework.integration.expression.IntegrationEvaluationContextAwareBeanPostProcessor#0,integrationGlobalProperties,integrationHeaderChannelRegistry,globalChannelInterceptorProcessor,toStringFriendlyJsonNodeToStringConverter,converterRegistrar,integrationConversionService,DefaultConfiguringBeanFactoryPostProcessor,datatypeChannelMessageConverter,messageBuilderFactory,inputFromKafka,kafkaInboundChannelAdapter.source,org.springframework.scheduling.support.PeriodicTrigger#0,kafkaInboundChannelAdapter,consumerContext,zookeeperConnect,org.springframework.integration.handler.MethodInvokingMessageHandler#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#0,kafkaConsumer,org.springframework.integration.dsl.config.IntegrationFlowBeanPostProcessor,integrationRequestMappingHandlerMapping,nullChannel,errorChannel,_org.springframework.integration.errorLogger,taskScheduler,org.springframework.integration.config.IdGeneratorConfigurer#0]; root of factory hierarchy
13:59:35.366 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean '(inner bean)#382ecec5': [kafkaInboundChannelAdapter]
13:59:35.367 [localhost-startStop-1] DEBUG o.s.b.f.s.DisposableBeanAdapter - Invoking destroy() on bean with name 'consumerContext'
15/11/17 13:59:35 INFO consumer.ZookeeperConsumerConnector: [default_wassimd-1447757975520-3e9c50da], Connecting to zookeeper instance at 172.16.201.173:2181
15/11/17 13:59:35 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
1条答案
按热度按时间n3schb8v1#
这个错误看起来很明显-你应该一直向下查看堆栈跟踪以找出原因。。。
原因:java.lang.illegalargumentexception:尚未为终结点“org.springframework.integration.config.consumerendpointfactorybean#0”定义轮询器,并且上下文中没有可用的默认轮询器。
自
inputFromKafka
是一个QueueChannel
,您需要对其消费者进行民意调查。但是,使用
QueueChannel
在这里是不必要的;只需移除<int:queue/>
元素,你就不需要轮询器了。