spring集成kafka适配器不生成消息

py49o6xq  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(379)

我已经为此挣扎了好几天了。
我正在使用si适配器Kafka下Spring引导容器。
我已经在我的机器上配置了zookeeper和kafka。我还创建了console producer,并对其进行了测试,一切正常(我设法生成console消息,并让console consumer使用它们)。
我现在尝试通过SpringIntegrationKafka出站适配器生成消息,但控制台使用者不会使用该消息
si/spring xd xml:

<int:publish-subscribe-channel id="inputToKafka"/>

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-producer-context-ref="kafkaProducerContext"
                                        auto-startup="true"
                                        order="1"
                                        channel="inputToKafka">

    </int-kafka:outbound-channel-adapter>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              async="true"
                                              topic="zerg.hydra"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
</beans>

java 语:

@Named
public class KafkaProducer {

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel inputToKafka;

    public void sendMessageToKafka(String message)
    {
        inputToKafka.send(
                MessageBuilder.withPayload(message)
                        .setHeader("messageKey", "3")
                        .setHeader("topic", "zerg.hydra").build());

    }

}

我是这样运行kafka控制台的:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning

日志:

Testing started at 12:49 PM ...
12:49:54 PM: Executing external tasks 'cleanTest test'...
:cleanTest
:compileJava
:processResources UP-TO-DATE
:classes
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
12:50:07,165 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
12:50:07,166 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
12:50:07,166 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/test/logback.xml]
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs multiple times on the classpath.
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/test/logback.xml]
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/main/logback.xml]
12:50:07,247 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
12:50:07,250 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
12:50:07,259 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [stdout]
12:50:07,281 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
12:50:07,327 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [reactor] to INFO
12:50:07,327 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.projectreactor] to INFO
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.springframework] to WARN
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.springframework.integration] to DEBUG
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
12:50:07,328 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [stdout] to Logger[ROOT]
12:50:07,331 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
12:50:07,332 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@3de433b4 - Registering current configuration as safe fallback point

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.1.9.RELEASE)

12:50:09.530 [Test worker] INFO  o.s.i.config.IntegrationRegistrar - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
12:50:09.544 [Test worker] DEBUG o.s.i.config.IntegrationRegistrar - SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
12:50:10.717 [Test worker] INFO  o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
12:50:10.719 [Test worker] INFO  o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
12:50:10.973 DEBUG [Test worker][org.jboss.logging] Logging Provider: org.jboss.logging.Log4jLoggerProvider
12:50:10.974 INFO  [Test worker][org.hibernate.validator.internal.util.Version] HV000001: Hibernate Validator 5.0.3.Final
12:50:10.991 DEBUG [Test worker][org.hibernate.validator.internal.engine.resolver.DefaultTraversableResolver] Cannot find javax.persistence.Persistence on classpath. Assuming non JPA 2 environment. All properties will per default be traversable.
12:50:11.006 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom MessageInterpolator of type org.springframework.validation.beanvalidation.LocaleContextMessageInterpolator
12:50:11.011 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ConstraintValidatorFactory of type org.springframework.validation.beanvalidation.SpringConstraintValidatorFactory
12:50:11.018 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ParameterNameProvider of type com.sun.proxy.$Proxy46
12:50:11.024 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] Trying to load META-INF/validation.xml for XML based Validator configuration.
12:50:11.032 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] No META-INF/validation.xml found. Using annotation based configuration only.
12:50:12.089 [Test worker] INFO  o.a.catalina.core.StandardService - Starting service Tomcat
12:50:12.091 [Test worker] INFO  o.a.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/7.0.56
12:50:14.162 [localhost-startStop-1] INFO  o.a.c.c.C.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
12:50:16.567 [Test worker] INFO  o.s.i.k.support.ProducerFactoryBean - Using producer properties => {metadata.broker.list=localhost:9092, compression.codec=0, producer.type=async}
12:50:17.036 INFO  [Test worker][kafka.utils.VerifiableProperties] Verifying properties
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property compression.codec is overridden to 0
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property metadata.broker.list is overridden to localhost:9092
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property producer.type is overridden to async
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.resolver.DefaultTraversableResolver] Cannot find javax.persistence.Persistence on classpath. Assuming non JPA 2 environment. All properties will per default be traversable.
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom MessageInterpolator of type org.springframework.validation.beanvalidation.LocaleContextMessageInterpolator
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ConstraintValidatorFactory of type org.springframework.validation.beanvalidation.SpringConstraintValidatorFactory
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ParameterNameProvider of type com.sun.proxy.$Proxy46
12:50:17.592 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] Trying to load META-INF/validation.xml for XML based Validator configuration.
12:50:17.592 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] No META-INF/validation.xml found. Using annotation based configuration only.
12:50:18.967 [Test worker] DEBUG o.s.i.c.GlobalChannelInterceptorProcessor - No global channel interceptors.
12:50:18.978 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {mongo:outbound-channel-adapter:mongoAdapter.adapter} as a subscriber to the 'mongoAdapter' channel
12:50:18.979 [Test worker] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.mongoAdapter' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started mongoAdapter.adapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {mongo:outbound-channel-adapter:adapterWithConverter.adapter} as a subscriber to the 'adapterWithConverter' channel
12:50:18.979 [Test worker] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.adapterWithConverter' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started adapterWithConverter.adapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {message-handler:kafkaOutboundChannelAdapter} as a subscriber to the 'inputToKafka' channel
12:50:18.979 [Test worker] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.inputToKafka' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started kafkaOutboundChannelAdapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
12:50:18.979 [Test worker] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.errorChannel' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started _org.springframework.integration.errorLogger
12:50:19.026 [Test worker] INFO  o.a.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8091"]
12:50:19.040 [Test worker] INFO  o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8091"]
12:50:19.047 [Test worker] INFO  o.a.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
12:50:19.338 [Test worker] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'inputToKafka', message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
12:50:19.338 [Test worker] DEBUG o.s.i.k.o.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
12:50:19.362 [Test worker] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'inputToKafka', message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
Empty test suite.
12:50:19.402 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {mongo:outbound-channel-adapter:mongoAdapter.adapter} as a subscriber to the 'mongoAdapter' channel
12:50:19.417 [Thread-4] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.mongoAdapter' has 0 subscriber(s).
12:50:19.419 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped mongoAdapter.adapter
12:50:19.420 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {mongo:outbound-channel-adapter:adapterWithConverter.adapter} as a subscriber to the 'adapterWithConverter' channel
12:50:19.422 [Thread-4] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.adapterWithConverter' has 0 subscriber(s).
12:50:19.422 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped adapterWithConverter.adapter
12:50:19.423 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {message-handler:kafkaOutboundChannelAdapter} as a subscriber to the 'inputToKafka' channel
12:50:19.424 [Thread-4] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.inputToKafka' has 0 subscriber(s).
12:50:19.424 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped kafkaOutboundChannelAdapter
12:50:19.425 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
12:50:19.426 [Thread-4] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.errorChannel' has 0 subscriber(s).
12:50:19.426 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped _org.springframework.integration.errorLogger
BUILD SUCCESSFUL
Total time: 25.469 secs
12:50:20 PM: External tasks execution finished 'cleanTest test'.

我在同一个应用程序中尝试使用官方Kafka客户制作人,效果很好:

@Named
public class KafkaProducerJava {

    ProducerConfig config=null;
    Producer<String, String> producer;
    Properties props=null;

    @PostConstruct
    public void init()
    {
        props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
       // props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

    }

    public void sendMsgToKafka(String msg)
    {
        config= new ProducerConfig(props);
        producer=new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "", msg);

        producer.send(data);
        producer.close();
    }

你知道为什么信息从来没有通过spring集成Kafka适配器到达我的消费者吗??

u91tlkcl

u91tlkcl1#

我只是在xd中运行了它,它对我很好。。。

$ bin/xd-shell
 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
eXtreme Data
1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name foo --definition "time | kafka --topic=test" --deploy
Created and deployed new stream 'foo'
xd:>stream destroy foo
Destroyed stream 'foo'
xd:>

.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
2014-11-26 10:03:09
2014-11-26 10:03:10
2014-11-26 10:03:11
2014-11-26 10:03:12
2014-11-26 10:03:13

我还写了这个测试用例。。。

public class OutboundTests {

    @Test
    public void test() throws Exception {
        KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
        ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>("test");
        producerMetadata.setValueClassType(String.class);
        producerMetadata.setKeyClassType(String.class);
        Encoder<String> encoder = new StringEncoder<String>();
        producerMetadata.setValueEncoder(encoder);
        producerMetadata.setKeyEncoder(encoder);
        ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, "localhost:9092");
        ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
        kafkaProducerContext.setProducerConfigurations(Collections.singletonMap("test", config));
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<String, String>(kafkaProducerContext);
        handler.handleMessage(MessageBuilder.withPayload("foo")
                .setHeader("messagekey", "3")
                .setHeader("topic", "test")
                .build());
    }

}

…来模拟你正在做的事情,这也奏效了。您是否看到任何记录的异常?我看到您没有设置键/值类型和编码器。
编辑:
正如在注解中所讨论的,问题是您使用的是异步生产者。在“works”的测试示例中,您正在关闭生产者,这将刷新队列。默认情况下,队列在5秒内不会被刷新,并且测试用例等待的时间不够长。
我更新了测试以包含一个xml配置的版本,并减少了 queue.buffering.max.ms 并在测试中添加了代码,以便在终止前等待几秒钟。
有关详细信息,请参见新提交。

ql3eal8s

ql3eal8s2#

从早上开始我就面临着同样的问题,最终找到了解决办法。如果你在主题前面加上Kafka,如下所示

.setHeader("kafka_topic", "zerg.hydra")

这是api内部的一个问题,当从消息头中查找密钥时,kafka\被加上前缀。

相关问题