至少一次-在多分区中处理故障和偏移

fgw7neuy  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(262)

下面是从kafka主题(8分区)接收消息并对其进行处理的用户代码。

@Component
    public class MessageConsumer {

        private static final String TOPIC = "mytopic.t";
        private static final String GROUP_ID = "mygroup";
        private final ReceiverOptions consumerSettings;
        private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);

        @Autowired
        public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
        {
            this.consumerSettings=consumerSettings;
            consumerMessage();
        }

        private void consumerMessage()
        {

        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

        Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

        Flux.defer(receiver::receive)
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                        partitionFlux.publishOn(scheduler)
                                .concatMap(m -> {
                                    LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
                                    return process(m.key(), m.value())
                                            .thenEmpty(m.receiverOffset().commit());
                                }))
                .retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
                .doOnError(err -> {
                    handleError(err);
                }).retry()
                .doOnCancel(() -> close()).subscribe();

    }

    private void close() {
    }

    private void handleError(Throwable err) {
        LOG.error("kafka stream error : ",err);
    }

    private Mono<Void> process(String key, String value)
    {
        if(key.equals("error"))
            return Mono.error(new Exception("process error : "));

        LOG.error("message consumed : "+key);
        return Mono.empty();
    }

    public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
        return consumerSettings
                .commitInterval(Duration.ZERO)
                .commitBatchSize(0)
                .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
                .subscription(topics);
    }

}
    @Bean(name="consumerSettings")
    public ReceiverOptions<String, String> getConsumerSettings() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put("max.block.ms", "3000");
        props.put("request.timeout.ms", "3000");

        return ReceiverOptions.create(props);
    }

在接收到每条消息时,如果消费的消息处理成功,我的处理逻辑将返回空mono。
如果处理逻辑中没有返回错误,那么一切都按预期工作。
但是,如果我抛出一个错误来模拟某个特定消息的处理逻辑中的异常行为,那么我将无法处理导致异常的消息。流移动到下一个消息。
我想要实现的是,处理当前消息并提交偏移量(如果成功),然后移动到下一条记录。
如果处理消息时出现任何异常,请不要提交当前偏移量,并重试同一消息,直到成功。在当前消息成功之前不要移动到下一个消息。
请让我知道如何在不跳过消息的情况下处理进程失败,并使流从引发异常的偏移量开始。
当做,
维诺

6pp0gazn

6pp0gazn1#

下面的代码适用于我。这样做的目的是重试配置的失败消息的次数,如果仍然失败,则将其移动到失败队列并提交消息。同时并发处理来自其他分区的消息。
如果来自特定分区的消息在配置的时间内失败,那么在延迟之后重新启动流,这样我们就可以通过不连续地命中它们来处理依赖性失败。

@Autowired
public ReactiveMessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings,MessageProducer producer)
{
    this.consumerSettings=consumerSettings;
    this.fraudCheckService=fraudCheckService;
    this.producer=producer;
    consumerMessage();
}

private void consumerMessage() {

    int numRetries=3;

    Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);

    KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));

    Flux<GroupedFlux<TopicPartition, ReceiverRecord<String, String>>> f = Flux.defer(receiver::receive)
            .groupBy(m -> m.receiverOffset().topicPartition());

    Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
            Flux.just(b)
                    .concatMap(a -> {
                        LOG.error("processing message - order: {} offset: {} partition: {}",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition());

                        return process(a.key(), a.value()).
                                then(a.receiverOffset().commit())
                                .doOnSuccess(d -> LOG.info("committing  order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()))
                                .doOnError(d -> LOG.info("committing offset failed for order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()));
                    })
                    .retryWhen(companion -> companion
                            .doOnNext(s -> LOG.info(" --> Exception processing message for order {}: offset: {} partition: {} message: {} " , b.key() , b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition(),s.getMessage()))
                            .zipWith(Flux.range(1, numRetries), (error, index) -> {
                                if (index < numRetries) {
                                    LOG.info(" --> Retying {} order: {} offset: {} partition: {} ", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    return index;
                                } else {
                                    LOG.info(" --> Retries Exhausted: {} - order: {} offset: {} partition: {}. Message moved to error queue. Commit and proceed to next", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
                                    producer.sendMessages(ERROR_TOPIC,b.key(),b.value());
                                    b.receiverOffset().commit();
                                    //return index;
                                    throw Exceptions.propagate(error);
                                }
                            })
                            .flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 3)))
                            .doOnNext(s -> LOG.info(" --> Retried at: {} ", LocalTime.now()))
                    ))
    );

    f1.doOnError(a ->  {
                LOG.info("Moving to next message because of : ", a);
                try {

                    Thread.sleep(5000); // configurable
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

    ).retry().subscribe(); 

}

public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
    return consumerSettings
            .commitInterval(Duration.ZERO)
            .commitBatchSize(0)
            .addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
            .subscription(topics);
}

private Mono<Void> process(OrderId orderId, TraceId traceId)
{
    try {

        Thread.sleep(500); // simulate slow response
    } catch (InterruptedException e) {
        // Causes the restart
        e.printStackTrace();
    }

   if(orderId.getId().startsWith("error")) // simulate error scenario
        return Mono.error(new Exception("processing message failed for order: " + orderId.getId()));

    return Mono.empty();
}
z3yyvxxp

z3yyvxxp2#

创建不同的消费群体。
每个消费群体将与一个数据库相关。
创建您的使用者,以便他们只处理相关事件并将其推送到相关数据库。如果数据库关闭,则将使用者配置为无限时间重试。无论出于何种原因,如果你的消费者死亡,那么请确保他们从早期消费者离开的地方开始。在将数据提交到数据库并向kafka代理发送ack之后,您的消费者很可能立即死亡。您需要更新使用者代码,以确保您只处理一次消息(如果需要)。

相关问题