下面是从kafka主题(8分区)接收消息并对其进行处理的用户代码。
@Component
public class MessageConsumer {
private static final String TOPIC = "mytopic.t";
private static final String GROUP_ID = "mygroup";
private final ReceiverOptions consumerSettings;
private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);
@Autowired
public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
{
this.consumerSettings=consumerSettings;
consumerMessage();
}
private void consumerMessage()
{
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));
Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);
Flux.defer(receiver::receive)
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.concatMap(m -> {
LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
return process(m.key(), m.value())
.thenEmpty(m.receiverOffset().commit());
}))
.retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
.doOnError(err -> {
handleError(err);
}).retry()
.doOnCancel(() -> close()).subscribe();
}
private void close() {
}
private void handleError(Throwable err) {
LOG.error("kafka stream error : ",err);
}
private Mono<Void> process(String key, String value)
{
if(key.equals("error"))
return Mono.error(new Exception("process error : "));
LOG.error("message consumed : "+key);
return Mono.empty();
}
public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
return consumerSettings
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.subscription(topics);
}
}
@Bean(name="consumerSettings")
public ReceiverOptions<String, String> getConsumerSettings() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put("max.block.ms", "3000");
props.put("request.timeout.ms", "3000");
return ReceiverOptions.create(props);
}
在接收到每条消息时,如果消费的消息处理成功,我的处理逻辑将返回空mono。
如果处理逻辑中没有返回错误,那么一切都按预期工作。
但是,如果我抛出一个错误来模拟某个特定消息的处理逻辑中的异常行为,那么我将无法处理导致异常的消息。流移动到下一个消息。
我想要实现的是,处理当前消息并提交偏移量(如果成功),然后移动到下一条记录。
如果处理消息时出现任何异常,请不要提交当前偏移量,并重试同一消息,直到成功。在当前消息成功之前不要移动到下一个消息。
请让我知道如何在不跳过消息的情况下处理进程失败,并使流从引发异常的偏移量开始。
当做,
维诺
2条答案
按热度按时间6pp0gazn1#
下面的代码适用于我。这样做的目的是重试配置的失败消息的次数,如果仍然失败,则将其移动到失败队列并提交消息。同时并发处理来自其他分区的消息。
如果来自特定分区的消息在配置的时间内失败,那么在延迟之后重新启动流,这样我们就可以通过不连续地命中它们来处理依赖性失败。
z3yyvxxp2#
创建不同的消费群体。
每个消费群体将与一个数据库相关。
创建您的使用者,以便他们只处理相关事件并将其推送到相关数据库。如果数据库关闭,则将使用者配置为无限时间重试。无论出于何种原因,如果你的消费者死亡,那么请确保他们从早期消费者离开的地方开始。在将数据提交到数据库并向kafka代理发送ack之后,您的消费者很可能立即死亡。您需要更新使用者代码,以确保您只处理一次消息(如果需要)。