1.在本地计算机localhost:9092
上运行Kafka
1.使用Replica 1
和Partition 3
创建了主题ProductOrders
1.使用POJO订单事件、订单事件发布者和订单事件使用者创建了spring-boot 2.4.3
、spring-kafka 2.6.6
和spring-cloud-stream 3.1.1
应用程序
1.想要在主题ProductOrders
上发布OrderEvent
并从主题OrderEvent
中使用OrderEvent
1.运行应用程序时,我看不到应用程序发布或使用的OrderEvent。同样经过验证的是,即使是kafka-console-consumer
也看不到事件。application.yml
spring:
cloud.stream:
bindings:
orderCreated-out-0:
destination: ProductOrders
processOrder-in-0:
destination: ProductOrders
kafka.streams.broker:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: '*'
spring.json.value.default.type: com.poc.kafka.orderanalytics.events.OrderEvent
brokers: localhost:9092
serdeError: logAndContinue
kafka:
bootstrap-servers: localhost:9092
OrderEventPublisher.java
public class OrderEventPublisher {
@Bean
public Supplier<OrderEvent> orderCreated() {
log.info("Configuring Order Data");
Map<String, String[]> orders = new HashMap<>();
orders.put("home_appliances", new String[] {"Toaster","Grill","Veg Cutter"});
orders.put("gardening", new String[] {"Coco Pit","Vermi Compost","Petunia Seeds"});
orders.put("toys", new String[] {"Dump Truck","Fighter Jet","Fire Truck"});
orders.put("books", new String[] {"Let Us C","Effective Java","Apache Kafka, complete guide"});
orders.put("cloths", new String[] {"T Shirt","Short","Vests"});
return () -> {
log.info("Creating order");
String category = (String) orders.keySet().toArray()[new Random().nextInt(orders.size())];
String name = orders.get(category)[new Random().nextInt(orders.get(category).length)];
OrderEvent orderEvent = new OrderEvent();
orderEvent.setCategory(category);
orderEvent.setName(name);
orderEvent.setQuantity(1);
return orderEvent;
};
}
}
OrderEventConsumer.java
public class OrderEventConsumer {
@Bean
public Consumer<OrderEvent> processOrder() {
log.info("Consuming the order");
return orderEvent -> {
log.info("Order Consumed with category: {}, name: {}, qty: {}",orderEvent.getCategory(), orderEvent.getName(), orderEvent.getQuantity());
};
}
}
如果我将生产者和消费者分为两个不同的 Spring Boot 应用程序,所有程序都按预期工作。当生产者和消费者同时存在于一个Spring启动应用程序中时,不工作会是什么问题?
1条答案
按热度按时间dfuffjeb1#
很抱歉响应延迟。应用程序的配置出现了问题。由于应用程序中有两个不同的函数,您需要告诉框架需要激活哪些函数。添加以下配置后,我能够运行应用程序,然后生成/使用数据。