我开始了一个新的sb应用程序,将作为Kafka的消费者,我开始玩Kafka流,但我得到以下例外启动应用程序时
java.lang.classcastexception:无法将类com.sun.proxy.$proxy101强制转换为类org.springframework.messaging.messagechannel(com.sun.proxy.$proxy101和org.springframework.messaging.messagechannel位于加载程序“app”的未命名模块中)org.springframework.cloud.stream.binder.abstractmessagechannelbinder.dobindconsumer(abstractmessagechannelbinder)。java:91)~[spring-cloud-stream-3.1.2。jar:3.1.2]位于org.springframework.cloud.stream.binder.abstractbinder.bindconsumer(abstractbinder.com)。java:143)~[spring-cloud-stream-3.1.2。jar:3.1.2]在org.springframework.cloud.stream.binding.bindingservice.lambda$rescheduleconsumerbinding$1(bindingservice)。java:201)~[spring-cloud-stream-3.1.2。jar:3.1.2]在org.springframework.scheduling.support.delegatingerrorhandlingrunnable.run(delegatingerrorhandlingrunnable。java:54)~[spring-context-5.3.5。jar:5.3.5]在java.base/java.util.concurrent.executors$runnableadapter.call(executors。java:515)~[na:na]位于java.base/java.util.concurrent.futuretask.run(futuretask。java:264)~[na:na]位于java.base/java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor)。java:304)~[na:na]在java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)~[na:na]位于java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)~[na:na]在java.base/java.lang.thread.run(thread。java:834)~[娜:安]
这就是我如何声明一个字被发送多少次的kstream:
@Bean
public Consumer<KStream<Bytes, String>> target() {
return input -> input.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(30000))).count(Materialized.as("words-count"))
.toStream().map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
使用相同的application.yml并具有仅用于接收这样的消息的使用者函数
@Bean
public Consumer<Message<String>> target() {
return message -> {
System.out.println("******************");
System.out.println("Received message from source: " + message.getPayload());
};
}
一切正常。
我使用以下版本:
springboot v2.4.4版
springcloud版本2020.0.2
java v11.0.10版
在单元测试期间,我也看到过其他帖子出现这种错误,但我甚至还没有进行过任何junit测试。
任何帮助都将不胜感激
1条答案
按热度按时间xmd2e60i1#
看起来类路径中没有kafka streams绑定器,而是消息通道绑定器。确保你有依赖关系
spring-cloud-stream-binder-kakfa-streams
在类路径上。