axon 4.3-使用来自主题kafka的消息

w41d8nur  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(584)

我使用的是axon版本(4.3),它无缝地支持springboot主类中带有注解的kafka,使用

@SpringBootApplication(exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class)

在我的例子中,消息成功地保存在主题中,但是这个消费者的问题是,我无法使用主题中的消息。
你的配置手册?

<dependency>
   <groupId>org.axonframework</groupId>
   <artifactId>axon-spring-boot-starter</artifactId>
   <version>4.3</version>
   <exclusions>
     <exclusion>
        <groupId>org.axonframework</groupId>
        <artifactId>axon-server-connector</artifactId>
     </exclusion>
   </exclusions>
 </dependency>
 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
 </dependency>
 <dependency>
   <groupId>org.axonframework.extensions.kafka</groupId>
   <artifactId>axon-kafka-spring-boot-starter</artifactId>
   <version>4.0-RC3</version>
 </dependency>

应用程序.yml

axon:
  eventhandling:
    processors:
      conventions:
        source: kafkaMessageSource
        mode: tracking
  serializer:
    general: jackson
  kafka:
    client-id: consumer_service
    default-topic: topic_x
    bootstrap-servers:
    - 127.0.0.1:9092

侦听器.java

//@Component
@ProcessingGroup(value = "conventions")
public class Listener {

 private static final Logger LOGGER = LoggerFactory.getLogger(GenericListener.class);

 @EventHandler
 void on(ConventionCreatedEvent event) {
  LOGGER.info("got the event {}", event);
 }

}
bejyjqdl

bejyjqdl1#

我认为这是Kafka消息源的名字在你的配置,这是罪魁祸首现在艾门。
当使用axon的auto-configuration和axon-kafka auto-configuration时,如果没有任何关于所需kafka消息源类型的细节,则 StreamableKafkaMessageSource 将创建。那个豆子的名字就是 streamableKafkaMessageSource .
在你的 application.yml 然而,你期望 source 为了你的 conventions 要调用的跟踪事件处理器 kafkaMessageSource .
接下来,您可以看看axon的kafka扩展中包含的示例应用程序。也许这会让事情更清楚一点。

相关问题