为什么kafka流没有@streamlistener就不能工作?

t40tm48m  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(506)

@streamlistener现在被弃用了,所以我尝试用函数的方式重写我的使用者。
java代码

  1. @Component
  2. public class MyConsumer {
  3. public void handleMyMethod(MessageEntity message) {
  4. ...
  5. }
  6. }
  7. @RequiredArgsConstructor
  8. @Configuration
  9. public class MyConsumerConfiguration {
  10. private final MyConsumer myConsumer;
  11. @Bean
  12. public Consumer<MessageEntity> myMethod() {
  13. return myConsumer::handleMyMethod;
  14. }
  15. }

应用程序.yaml

  1. spring.cloud :
  2. stream :
  3. kafka.binder.brokers :
  4. - "127.0.0.1:9092"
  5. function.definition : myMethod
  6. bindings :
  7. myMethod-in-0 :
  8. destination : service.method.update

测试

  1. @Test
  2. void handleMyMethod() {
  3. MessageEntity message = new MessageEntity();
  4. template.send("service.method.update", message);
  5. await()
  6. .atMost(6, TimeUnit.SECONDS)
  7. .untilAsserted(() -> {
  8. verify(myConsumer, atLeastOnce()).handleMyMethod(entityCaptor.capture());
  9. MessageEntity entity = entityCaptor.getValue();
  10. assertNotNull(entity);
  11. });
  12. }

运行测试。这条消息正在传递给Kafka(我在Kafka工具中看到它),但我的消费者没有捕捉到它。测试失败,错误为:

  1. myConsumer.handleMethod(
  2. <Capturing argument>
  3. );
  4. -> at com.my.project.MyConsumer.handleMethod(MyConsumer.java:33)
  5. Actually, there were zero interactions with this mock.```
6ju8rftf

6ju8rftf1#

我发现了问题。我的util模块集成在上面描述的模块中,它有自己的“function.definition”属性。由于上面的模块是在util模块之前配置的,util模块从顶级模块中删除方法。我这样解决问题:

  1. @Autowired
  2. StreamFunctionProperties streamFunctionProperties;
  3. ...
  4. String definitionString = StringUtils.hasText(streamFunctionProperties.getDefinition()) ?
  5. streamFunctionProperties.getDefinition() + ";" :
  6. "";
  7. streamFunctionProperties.setDefinition(definitionString + "methodDeleteOut;methodUpdateOut;roleUpdateIn;roleDeleteIn");

相关问题