spring cloud stream kafka streams binder kafkaexception:无法启动流:“listener”不能为null

kkih6yb8  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(474)

我对kafka streams和spring cloud stream还比较陌生,但是我读过一些关于它的好东西,比如将集成相关的代码移到属性文件中,这样开发者就可以把主要精力放在业务逻辑方面。
这里我有一个简单的应用程序类。

package com.some.events.consumer

import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer

@SpringBootApplication
class ConsumerApplication {
    @Bean
    fun consume(): Consumer<KStream<String, SomeEvent>> {
        return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
    }
}

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}

我的 application.yml 文件如下。

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: "some-event"
          group: "some-event"

我的相依关系 build.gradle.kts 定义如下(此处仅包括相关的定义)。

extra["springCloudVersion"] = "2020.0.2"

dependencies {
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

当我运行应用程序时,我得到了以下异常。

org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
    at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
    ... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    ... 15 common frames omitted

Process finished with exit code 1

注意,我知道我需要配置serde和avro相关的东西(我正在使用avro作为事件模式),但问题是,流甚至不会运行。
有人能给我指出正确的方向吗?我试着在google上搜索这个,但是没有人发布一个由“listener”不能为null引起的问题。谢谢!

qjp7pelc

qjp7pelc1#

这是一只虫子;在3.1.3快照中已修复
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/f25dbff2b7fc0d0c742dd674a9e392057a34c86d
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
我不确定那里的评论;将测微计添加到类路径应该可以解决这个问题。

zpf6vheq

zpf6vheq2#

这个 destination: "some-event" 应该指向Kafka的主题。就像 destination: "some-event-topic" .
然后必须为侦听器创建一个接口 consume-in-0 . 使用spring注解将使项目加载这个监听器,它将不再为null。

import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.Input;

public interface KafkaListenerBinding {
    @Input("consume-in-0")
    KStream<String, String> inputStream();
}

然后创建一个@服务来处理来自侦听器的消息 @StreamListener("consume-in-0") .

import lombok.extern.log4j.Log4j2;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Log4j2
@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {

    @StreamListener("consume-in-0")
    public void process(KStream<String, String> input) {
        input.foreach((k,v) -> log.info(String.format("Key: %s, Value: %s",k,v)));
    }
}

注意:尽管@garyrussel说了这个bug,我还是要用实现spring服务的函数方式来完成我的回答。功能风格可以通过在 application.yml 文件。使用函数名和posfix有一个内部约定 in-0 以及 out-0 用于绑定。在定义绑定时必须使用此选项。更多细节在这里。

spring:
  cloud:
    stream:
      function:
        definition: transformToUpperCase
      bindings:
        transformToUpperCase-in-0:
          destination: input-func-topic
        transformToUpperCase-out-0:
          destination: output-func-topic

然后你用 @Configuration 以及 @EnableAutoConfiguration 并确保lambda方法与在 application.yml 文件 function.definition .

@Configuration
@EnableAutoConfiguration
public class KafkaListenerFunctionalService {

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> transformToUpperCase() {
        return input -> input
                .peek((k, v) -> log.info("Functional received Input: {}", v))
                .mapValues(i -> i.toUpperCase());
    }
}

相关问题