java—如何用锁存器 Package SeekTocurInterrorHandler处理的@kafkalistener以进行测试

fhity93d  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(434)

我有一个kafka侦听器,它抛出一个异常并转到seektocurinterrorhandler进行处理。我正在尝试 Package 侦听器以添加一个闩锁,并在其上Assert以进行测试。但似乎不起作用。有人能推荐一下吗。
侦听器:

@KafkaListener(id="testRetry",topics = "sr1", groupId = "retry-grp", containerFactory = "kafkaRetryListenerContainerFactory")
    public void listen1(ConsumerRecord<String, Anky> record,
            @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery, Acknowledgment ack) throws UserException {
            throw new UserException(code.getDbError());

    }

配置:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, Anky> kafkaRetryListenerContainerFactory(SeekToCurrentErrorHandler seekToCurrentErrorHandler) {

        ConcurrentKafkaListenerContainerFactory<String, Anky> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        // to get the no retry count from the header
        factory.getContainerProperties().setDeliveryAttemptHeader(true);
        factory.setErrorHandler(seekToCurrentErrorHandler);
        // to keep the consumers alive the failure gets reported to broker so that
        // consumers remain alive
        factory.setStatefulRetry(true);
        factory.setConcurrency(2);
        return factory;
    }

    @Bean
    public SeekToCurrentErrorHandler seekToCurrentErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            //do nothing
        });
        errorHandler.setCommitRecovered(true);
        errorHandler.setBackOffFunction((record, exception) -> {
            return new FixedBackOff(0L,5L);

        });
        return errorHandler;
    }

测试:

@Test
public void testRetry() throws Exception,UserException {
    Anky msg = new Anky();
    this.template.send("sr1", "1234", msg);
    CountDownLatch latch = new CountDownLatch(1);
            ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) kafkaListenerEndpointRegistry
            .getListenerContainer("testRetry");
    container.stop();
    @SuppressWarnings("unchecked")
    AcknowledgingConsumerAwareMessageListener<String, Anky> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Anky>) container
            .getContainerProperties().getMessageListener();

    container.getContainerProperties()
            .setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Anky() {

                @Override
                public void onMessage(ConsumerRecord<String, Anky> data, Acknowledgment acknowledgment,
                        Consumer<?, ?> consumer) {
                    messageListener.onMessage(data, acknowledgment, consumer);
                    latch.countDown();
                }

            });
    container.start();
    assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

}
nhjlsmyf

nhjlsmyf1#

添加 try...finally 阻止。

try {
    messageListener.onMessage(data, acknowledgment, consumer);
}
finally {
    latch.countDown();
}

相关问题