需要关于spring boot kafka stream binder应用程序错误的建议

wkyowqbh  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(537)

我试着从https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_usage_2 地点。
我能成功地建造它。但是在运行它时出现如下所示的错误(java.lang.illegalstateexception:org.springframework.cloud.stream.function.functionconfiguration上的错误处理条件)。有人能帮忙找出代码中的问题吗?
replicatekafkastreamapplication.java文件

package com.example.kafka.replicateKafka;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.messaging.handler.annotation.SendTo;

import java.util.Arrays;
import java.util.Date;

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class ReplicateKafkaStreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReplicateKafkaStreamApplication.class, args);
    }

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

}

class WordCount {

    private String word;

    private long count;

    private Date start;

    private Date end;

    public WordCount(String word, long count, Date start, Date end) {
        this.word = word;
        this.count = count;
        this.start = start;
        this.end = end;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public Date getStart() {
        return start;
    }

    public void setStart(Date start) {
        this.start = start;
    }

    public Date getEnd() {
        return end;
    }

    public void setEnd(Date end) {
        this.end = end;
    }
}

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example.kafka</groupId>
    <artifactId>replicateKafkaStream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>replicateKafkaStream</name>
    <description>Demo project for replicating kafka</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.0.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kstream</artifactId>
            <version>1.3.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

错误

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.3.RELEASE)

2020-08-19 18:48:26.339  INFO 13724 --- [           main] c.e.k.r.ReplicateKafkaStreamApplication  : Starting ReplicateKafkaStreamApplication on DESKTOP-NS4292D with PID 13724 (C:\JavaReskillGit\replicateKafkaStream\target\classes started by BishwajeetNaik in C:\JavaReskillGit\replicateKafkaStream)
2020-08-19 18:48:26.344  INFO 13724 --- [           main] c.e.k.r.ReplicateKafkaStreamApplication  : No active profile set, falling back to default profiles: default
2020-08-19 18:48:27.786 ERROR 13724 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Error processing condition on org.springframework.cloud.stream.function.FunctionConfiguration
    at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:60) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:108) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader$TrackedConditionEvaluator.shouldSkip(ConfigurationClassBeanDefinitionReader.java:469) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:131) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:120) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:331) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:236) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:280) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:96) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:707) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:533) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.collectBeanNamesForType(OnBeanCondition.java:238) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:231) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getBeanNamesForType(OnBeanCondition.java:221) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchingBeans(OnBeanCondition.java:169) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.OnBeanCondition.getMatchOutcome(OnBeanCondition.java:119) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:47) ~[spring-boot-autoconfigure-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    ... 17 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderFactoryBean
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method) ~[na:na]
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
    at java.base/java.lang.Class.getDeclaredMethods(Class.java:2309) ~[na:na]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    ... 33 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.core.KStreamBuilderFactoryBean
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 37 common frames omitted

2020-08-19 18:48:27.801  WARN 13724 --- [           main] o.s.boot.SpringApplication               : Unable to close ApplicationContext

java.lang.IllegalStateException: Failed to introspect Class [org.springframework.cloud.stream.binder.kstream.config.KStreamBinderSupportAutoConfiguration] from ClassLoader [jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[na:na]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:741) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:680) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:648) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1614) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:495) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:620) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:612) ~[spring-beans-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1243) ~[spring-context-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
    at com.example.kafka.replicateKafka.ReplicateKafkaStreamApplication.main(ReplicateKafkaStreamApplication.java:22) ~[classes/:na]
Caused by: java.lang.NoClassDefFoundError: org/springframework/kafka/core/KStreamBuilderFactoryBean
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method) ~[na:na]
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166) ~[na:na]
    at java.base/java.lang.Class.getDeclaredMethods(Class.java:2309) ~[na:na]
    at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    ... 21 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.core.KStreamBuilderFactoryBean
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[na:na]
    ... 25 common frames omitted

Process finished with exit code 1
imzjd6km

imzjd6km1#

为了正确加载依赖项,您需要使用匹配的spring版本
真正的错误是noclassdeffounderror
删除“依赖项”部分中的所有版本,并更新要使用的父级

<artifactId>spring-cloud-starter-stream-kafka</artifactId>

并将其从依赖项中移除

相关问题