java 无法在同一个Sping Boot 应用程序中生成和使用Kafka事件

icomxhvb  于 2023-01-11  发布在  Java
关注(0)|答案(1)|浏览(84)

1.在本地计算机localhost:9092上运行Kafka
1.使用Replica 1Partition 3创建了主题ProductOrders
1.使用POJO订单事件、订单事件发布者和订单事件使用者创建了spring-boot 2.4.3spring-kafka 2.6.6spring-cloud-stream 3.1.1应用程序
1.想要在主题ProductOrders上发布OrderEvent并从主题OrderEvent中使用OrderEvent
1.运行应用程序时,我看不到应用程序发布或使用的OrderEvent。同样经过验证的是,即使是kafka-console-consumer也看不到事件。
application.yml

spring:
  cloud.stream:
    bindings:
      orderCreated-out-0:
        destination: ProductOrders
      processOrder-in-0:
        destination: ProductOrders
    kafka.streams.broker:
      configuration:
        commit.interval.ms: 1000
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
        spring.json.trusted.packages: '*'
        spring.json.value.default.type: com.poc.kafka.orderanalytics.events.OrderEvent
      brokers: localhost:9092
      serdeError: logAndContinue

  kafka:
    bootstrap-servers: localhost:9092

OrderEventPublisher.java

public class OrderEventPublisher {
    @Bean
    public Supplier<OrderEvent> orderCreated() {
        log.info("Configuring Order Data");
        Map<String, String[]> orders = new HashMap<>();
        orders.put("home_appliances", new String[] {"Toaster","Grill","Veg Cutter"});
        orders.put("gardening", new String[] {"Coco Pit","Vermi Compost","Petunia Seeds"});
        orders.put("toys", new String[] {"Dump Truck","Fighter Jet","Fire Truck"});
        orders.put("books", new String[] {"Let Us C","Effective Java","Apache Kafka, complete guide"});
        orders.put("cloths", new String[] {"T Shirt","Short","Vests"});
        return () -> {
            log.info("Creating order");
            String category = (String) orders.keySet().toArray()[new Random().nextInt(orders.size())];
            String name = orders.get(category)[new Random().nextInt(orders.get(category).length)];
            OrderEvent orderEvent = new OrderEvent();
            orderEvent.setCategory(category);
            orderEvent.setName(name);
            orderEvent.setQuantity(1);
            return orderEvent;
        };
    }
}

OrderEventConsumer.java

public class OrderEventConsumer {
    @Bean
    public Consumer<OrderEvent> processOrder() {
        log.info("Consuming the order");
        return orderEvent -> {
          log.info("Order Consumed with category: {}, name: {}, qty: {}",orderEvent.getCategory(), orderEvent.getName(), orderEvent.getQuantity());
        };
    }
}

如果我将生产者和消费者分为两个不同的 Spring Boot 应用程序,所有程序都按预期工作。当生产者和消费者同时存在于一个Spring启动应用程序中时,不工作会是什么问题?

dfuffjeb

dfuffjeb1#

很抱歉响应延迟。应用程序的配置出现了问题。由于应用程序中有两个不同的函数,您需要告诉框架需要激活哪些函数。添加以下配置后,我能够运行应用程序,然后生成/使用数据。

spring.cloud.function.definition: orderCreated;processOrder

相关问题