quarkus+kafka+smallrye异常处理

mefy6pfw  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(633)

如何使用quarkus+kafka+smallrye处理流处理异常?
我的代码与quarkus指南上的命令生产者示例非常相似(https://quarkus.io/guides/kafka#imperative-用法)

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(price);
    }
}

我想要类似于vanilla kafka库的东西,它提供了处理请求发送的每条记录的回调的选项。

ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        logger.info(record.toString());

        if (exception != null) {
            logger.error("Producer exception", exception);
        }
    }
});

tks公司

c7rzv4ha

c7rzv4ha1#

文件中有一部分是关于承认的

@Incoming("i")
@Outgoing("j")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
  public CompletionStage<Message<String>> manualAck(Message<String> input) {
    return CompletableFuture.supplyAsync(input::getPayload)
      .thenApply(Message::of)
      .thenCompose(m -> input.ack().thenApply(x -> m));
  }

相关问题