用apachekafkajava和micronaut应用程序设计restapi

ipakzgxi  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(253)


我在使用micronaut框架处理restapi crud操作时看到了上面的图表。我有一个单向流,控制器需要从kafka消费者api知道操作的执行情况。
例如
要从数据库中获取所有项目列表,需要在使用者级别执行
用户级的添加/更新/删除操作
我在消费者级别(监听器)有下面的micronautReact式代码来从mongodb获取产品列表

  1. @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
  2. public Flowable<Product> findByFreeText(String text) {
  3. LOG.info(String.format("Listener --> Listening value = %s", text));
  4. return Flowable.fromPublisher(repository.getCollection("product", Product.class)
  5. .find(new Document("$text",
  6. new Document("$search", text)
  7. .append("$caseSensitive", false)
  8. .append("$diacriticSensitive", false)
  9. )));
  10. }

生产者接口

  1. @KafkaClient
  2. public interface IProductProducer {
  3. @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
  4. Flowable<Product> findFreeText(String text);
  5. }

生产者的实施

  1. @Override
  2. public Flowable<ProductViewModel> findFreeText(String text) {
  3. LOG.info("Manager --> Finding all the products");
  4. List<ProductViewModel> model = new ArrayList<>();
  5. iProductProducer.findFreeText(text).subscribe(item -> {
  6. System.out.println(item);
  7. }, error ->{
  8. System.out.println(error);
  9. });
  10. return Flowable.just(new ProductViewModel());
  11. }

来自micronaut文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkalistenermethods
根据micronaut文件接收和返回React类型

  1. @Topic("reactive-products")
  2. public Single<Product> receive(
  3. @KafkaKey String brand,
  4. Single<Product> productFlowable) {
  5. return productFlowable.doOnSuccess((product) ->
  6. System.out.println("Got Product - " + product.getName() + " by " + brand)
  7. );
  8. }

在我的producer实现中,我订阅时从未收到值。

  1. iProductProducer.findFreeText(text).subscribe(item -> {
  2. System.out.println(item);
  3. }, error ->{
  4. System.out.println(error);
  5. });

我想知道上面的流是否是用kafka执行restapi crud操作的正确方法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题