evaluationcontext null与spring集成和kafka

p4rjhz4m  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(372)

我试图在spring集成中定义一个简单的消息流,从一个通道读取消息,然后将消息转储到kafka队列中。为此,我使用了spring集成kafka。问题是我有一个 EvaluationContext 我无法破译的错误。
以下是我的xml配置:

<int:channel id="myStreamChannel"/>
<int:gateway id="myGateway" service-interface="com.myApp.MyGateway" >
    <int:method name="process" request-channel="myStreamChannel"/>
</int:gateway>
<int:channel id="activityOutputChannel"/>
<int:transformer input-channel="myStreamChannel" output-channel="activityOutputChannel" ref="activityTransformer"/>    
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-producer-context-ref="kafkaProducerContext"
                                    auto-startup="false"
                                    channel="activityOutputChannel"
                                    topic="my-test"
                                    message-key-expression="header.messageKey">
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>

<task:executor id="taskExecutor"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="kafkaserver.com:9092"
                                          key-class-type="java.lang.String"
                                          value-class-type="java.lang.String"
                                          topic="my-test"
                                          key-encoder="stringEncoder"
                                          value-encoder="stringEncoder"
                                          compression-codec="snappy"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

当我通过spring boot运行应用程序时,出现以下异常:
创建名为“org.springframework.integration.kafka.outbound.kafkaproducermessagehandler#0”的bean时出错:调用init方法失败;嵌套异常为java.lang.illegalargumentexception:[Assert失败]-此参数是必需的;不能为空
这是堆栈跟踪中有问题的行:
位于org.springframework.integration.kafka.outbound.kafkaproducermessagehandler.oninit(kafkaproducermessagehandler。java:68)
下面是第68行发生的情况: Assert.notNull(this.evaluationContext); 所以 EvaluationContext 为空。我不知道为什么。
顺便说一下,当我用一个琐碎的 stdout 端点打印消息体,一切正常。
你能告诉我Kafka端点配置有什么问题吗 EvaluationContext 是否可用?

uklbhaso

uklbhaso1#

你的问题属于版本不匹配的地狱。SpringBoot拉入了SpringIntegrationCore版本,它不支持 IntegrationEvaluationContextAware 填充 EvaluationContextKafkaProducerMessageHandler .
因此,您应该将spring integration kafka升级到最新版本:https://github.com/spring-projects/spring-integration-kafka/releases

相关问题