spring集成kafka-tomcat问题

gzjq41n4  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(515)

我正在尝试从kafka接收消息并用java方法处理它们。我附上的文件,可以帮助您了解和定位的问题。问题是我得到了错误:org.springframework.beans.factory.beancreationexception:创建名为“org.springframework.integration.config.consumerdpointfactorybean”的bean时出错,如下所示。
网站.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">
  <display-name>cosmob</display-name>
  <welcome-file-list>
    <welcome-file>index.html</welcome-file>
    <welcome-file>index.htm</welcome-file>
    <welcome-file>index.jsp</welcome-file>
    <welcome-file>default.html</welcome-file>
    <welcome-file>default.htm</welcome-file>
    <welcome-file>default.jsp</welcome-file>
  </welcome-file-list>

    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/beans.xml</param-value>
    </context-param>

    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/intergration-cfg-app.xml</param-value>
    </context-param>

    <servlet>
        <servlet-name>CXFServlet</servlet-name>
        <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>CXFServlet</servlet-name>
        <url-pattern>/rest/*</url-pattern>
    </servlet-mapping>

    <servlet>
        <servlet-name>WsServlet</servlet-name>
        <servlet-class>org.cosmob.ccc.web.service.WsServlet</servlet-class>
    </servlet>

    <servlet-mapping>
        <servlet-name>WsServlet</servlet-name>
        <url-pattern>/ws/positions</url-pattern>
    </servlet-mapping>

    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>

</web-app>

integration-cfg-app.xml文件

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputFromKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
                                           kafka-consumer-context-ref="consumerContext"
                                           auto-startup="false"
                                           channel="inputFromKafka">
            <int:poller default="true" fixed-delay="1000" time-unit="MILLISECONDS" max-messages-per-poll="5"/>
    </int-kafka:inbound-channel-adapter>

    <int-kafka:consumer-context id="consumerContext"
                                   consumer-timeout="1000"
                                   zookeeper-connect="zookeeperConnect">
           <int-kafka:consumer-configurations>
                <int-kafka:consumer-configuration group-id="default"
                       max-messages="5000">
                   <int-kafka:topic id="test2" streams="2"/>
                </int-kafka:consumer-configuration>

           </int-kafka:consumer-configurations>
   </int-kafka:consumer-context>

   <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="172.16.201.173:2181" zk-connection-timeout="6000"
                        zk-session-timeout="6000"
                        zk-sync-time="2000" />

   <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumer" method="processMessage" />

  <bean id="kafkaConsumer" class="org.cosmob.kafka.KafkaConsumer"/>

</beans>

Kafka消费者.java

package org.cosmob.kafka;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class KafkaConsumer {

    public KafkaConsumer() {
        // TODO Auto-generated constructor stub
    }

    public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
        for (Map.Entry<String, Map<Integer, List<byte[]>>> entry : msgs
                .entrySet()) {
            System.out.println("Topic:" + entry.getKey());
            ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry
                    .getValue();
            System.out.println("\n****Partition: \n");
            Set<Integer> keys   = messages.keySet();
            for (Integer i : keys)
                System.out.println("p:"+i);
            System.out.println("\n**************\n");
            Collection<List<byte[]>> values = messages.values();
            for (Iterator<List<byte[]>> iterator = values.iterator(); iterator
                    .hasNext();) {
                List<byte[]> list = iterator.next();
                for (byte[] object : list) {
                    String message = new String(object);
                    System.out.println("Message: " + message);

                }
            }

        }
    }

}

启动tomcat时出错

13:59:34.401 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.handler.MethodInvokingMessageHandler#0'
13:59:34.404 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Invoking afterPropertiesSet() on bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:59:34.404 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'inputFromKafka'
13:59:35.364 [localhost-startStop-1] WARN  o.s.w.c.s.XmlWebApplicationContext - Exception encountered during context initialization - cancelling refresh attempt
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context.
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1578) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:305) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:301) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:753) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:834) ~[spring-context-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:537) ~[spring-context-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:446) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:328) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107) [spring-web-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4994) [catalina.jar:7.0.57]
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5492) [catalina.jar:7.0.57]
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [catalina.jar:7.0.57]
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1575) [catalina.jar:7.0.57]
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1565) [catalina.jar:7.0.57]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_31]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_31]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_31]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_31]
Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context.
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:260) ~[spring-integration-core-4.1.6.RELEASE.jar:na]
    at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:211) ~[spring-integration-core-4.1.6.RELEASE.jar:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574) ~[spring-beans-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    ... 21 common frames omitted
13:59:35.365 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@76967156: defining beans [channelInitializer,$autoCreateChannelCandidates,IntegrationConfigurationBeanFactoryPostProcessor,integrationEvaluationContext,org.springframework.integration.expression.IntegrationEvaluationContextAwareBeanPostProcessor#0,integrationGlobalProperties,integrationHeaderChannelRegistry,globalChannelInterceptorProcessor,toStringFriendlyJsonNodeToStringConverter,converterRegistrar,integrationConversionService,DefaultConfiguringBeanFactoryPostProcessor,datatypeChannelMessageConverter,messageBuilderFactory,inputFromKafka,kafkaInboundChannelAdapter.source,org.springframework.scheduling.support.PeriodicTrigger#0,kafkaInboundChannelAdapter,consumerContext,zookeeperConnect,org.springframework.integration.handler.MethodInvokingMessageHandler#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#0,kafkaConsumer,org.springframework.integration.dsl.config.IntegrationFlowBeanPostProcessor,integrationRequestMappingHandlerMapping,nullChannel,errorChannel,_org.springframework.integration.errorLogger,taskScheduler,org.springframework.integration.config.IdGeneratorConfigurer#0]; root of factory hierarchy
13:59:35.366 [localhost-startStop-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean '(inner bean)#382ecec5': [kafkaInboundChannelAdapter]
13:59:35.367 [localhost-startStop-1] DEBUG o.s.b.f.s.DisposableBeanAdapter - Invoking destroy() on bean with name 'consumerContext'
15/11/17 13:59:35 INFO consumer.ZookeeperConsumerConnector: [default_wassimd-1447757975520-3e9c50da], Connecting to zookeeper instance at 172.16.201.173:2181
15/11/17 13:59:35 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
n3schb8v

n3schb8v1#

这个错误看起来很明显-你应该一直向下查看堆栈跟踪以找出原因。。。
原因:java.lang.illegalargumentexception:尚未为终结点“org.springframework.integration.config.consumerendpointfactorybean#0”定义轮询器,并且上下文中没有可用的默认轮询器。
inputFromKafka 是一个 QueueChannel ,您需要对其消费者进行民意调查。

<int:outbound-channel-adapter channel="inputFromKafka"... >
    <int:poller ... />
</int:outbound-channel-adapter>

但是,使用 QueueChannel 在这里是不必要的;只需移除 <int:queue/> 元素,你就不需要轮询器了。

相关问题