我在使用micronaut框架处理restapi crud操作时看到了上面的图表。我有一个单向流,控制器需要从kafka消费者api知道操作的执行情况。
例如
要从数据库中获取所有项目列表,需要在使用者级别执行
用户级的添加/更新/删除操作
我在消费者级别(监听器)有下面的micronautReact式代码来从mongodb获取产品列表
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
LOG.info(String.format("Listener --> Listening value = %s", text));
return Flowable.fromPublisher(repository.getCollection("product", Product.class)
.find(new Document("$text",
new Document("$search", text)
.append("$caseSensitive", false)
.append("$diacriticSensitive", false)
)));
}
生产者接口
@KafkaClient
public interface IProductProducer {
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
Flowable<Product> findFreeText(String text);
}
生产者的实施
@Override
public Flowable<ProductViewModel> findFreeText(String text) {
LOG.info("Manager --> Finding all the products");
List<ProductViewModel> model = new ArrayList<>();
iProductProducer.findFreeText(text).subscribe(item -> {
System.out.println(item);
}, error ->{
System.out.println(error);
});
return Flowable.just(new ProductViewModel());
}
来自micronaut文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkalistenermethods
根据micronaut文件接收和返回React类型
@Topic("reactive-products")
public Single<Product> receive(
@KafkaKey String brand,
Single<Product> productFlowable) {
return productFlowable.doOnSuccess((product) ->
System.out.println("Got Product - " + product.getName() + " by " + brand)
);
}
在我的producer实现中,我订阅时从未收到值。
iProductProducer.findFreeText(text).subscribe(item -> {
System.out.println(item);
}, error ->{
System.out.println(error);
});
我想知道上面的流是否是用kafka执行restapi crud操作的正确方法?
暂无答案!
目前还没有任何答案,快来回答吧!