java—将异常从kafka订阅服务器向上游传递到kafka,让它通过重试队列自动重试

vtwuwzda  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(239)

我正在用KafkaReact堆项目订阅一个主题。我的kafka在抛出异常时设置了自动重试,但是使用React式kafka subscribe很难抛出异常。
我有一个使用Reactkafka流的Reactjava应用程序。应用程序正在订阅主题。我的应用程序需要执行一些操作,比如基于从kafka接收到的这个事件进行外部api调用。但如果这些操作失败了,我想把一个例外扔回Kafka。我将kafka中间件配置为在应用程序引发异常时重试。
项目React堆:https://projectreactor.io/docs/kafka/release/reference/

Map<String, Object> consumerProps;
consumerProps = new HashMap<>();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TESTING");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverOptions =
        ReceiverOptions.<Integer, String>create(consumerProps)
            .subscription(Collections.singleton(topic));

Flux<ReceiverRecord<Integer, String>> inboundFlux =
        KafkaReceiver.create(receiverOptions)
            .receive();

inboundFlux
   .map(b -> processRecordThrowAnException(b))
   .subscribe(r -> {
      System.out.printf("Received message: %s\n", r);
      r.receiverOffset().acknowledge();
    });

private Mono<Void> processRecordThrowAnException(String payload) {
      return Mono.error(new Exception("processing message failed for order: " + traceId));
  }

如何将异常抛出回上游?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题