我正在用KafkaReact堆项目订阅一个主题。我的kafka在抛出异常时设置了自动重试,但是使用React式kafka subscribe很难抛出异常。
我有一个使用Reactkafka流的Reactjava应用程序。应用程序正在订阅主题。我的应用程序需要执行一些操作,比如基于从kafka接收到的这个事件进行外部api调用。但如果这些操作失败了,我想把一个例外扔回Kafka。我将kafka中间件配置为在应用程序引发异常时重试。
项目React堆:https://projectreactor.io/docs/kafka/release/reference/
Map<String, Object> consumerProps;
consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TESTING");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
receiverOptions =
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Flux<ReceiverRecord<Integer, String>> inboundFlux =
KafkaReceiver.create(receiverOptions)
.receive();
inboundFlux
.map(b -> processRecordThrowAnException(b))
.subscribe(r -> {
System.out.printf("Received message: %s\n", r);
r.receiverOffset().acknowledge();
});
private Mono<Void> processRecordThrowAnException(String payload) {
return Mono.error(new Exception("processing message failed for order: " + traceId));
}
如何将异常抛出回上游?
暂无答案!
目前还没有任何答案,快来回答吧!