如何在 Spring 工作maxAge Kafka Factory Producer?

y0u0uwnf  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(243)

我们正在使用transactional producers,有时我们发现自己处于超过7天没有流量的情况,这导致transactional id元数据丢失。在此时间之后的下一次写入总是会导致错误:

org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.

字符串
目前的解决方法是重新启动Kubernetes中的所有示例。
查看文档,我看到在2.5.8版中添加了一个新的属性“maxAge”,它必须少于7天才能刷新代理端的元数据。我正在测试,没有办法让它按预期工作。
我已经建立了一个演示项目来模拟行为,除了我的一些错误,maxAge在以下时间之前不会刷新元数据:

transactional.id.expiration.ms


使用的版本:

  1. confluentinc/cp-kafka:latest - Kafka 3.4
  2. Sping Boot 3.1.5 JDK 17
  3. Kafka客户端3.4.1
  4. Spring Kafka 3.0.12
  5. Spring Stream Binder Kafka 4.0.4
    事务元数据将在10秒内强制过期。
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 10000
  KAFKA_PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS: 100
  KAFKA_PRODUCER_ID_EXPIRATION_MS: 10000
  KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS: 10000
  KAFKA_TRANSACTION_REMOVE_EXPIRED_TRANSACTION_CLEANUP_INTERVAL_MS: 10000


将maxAge设置为7秒,期望它在10秒之前刷新,以获得额外的10秒:

@Configuration
@Slf4j
public class KafkaConfig {

    @Bean
    public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
        return producerFactory -> producerFactory.setMaxAge(Duration.ofSeconds(7));
    }

}


按需测试控制器到生产者消息

@RestController
public class TestController {

    private AtomicInteger semaphore = new AtomicInteger(0);

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping(value = "/test")
    public void sequentialSimple() {
            streamBridge.send("sendTestData-out-0", "testMessage_" + semaphore.getAndIncrement());
    }

}


我希望永远不会遇到InvalidPidMappingException,但我总是看到它。

2023-11-19T13:22:57.234+01:00 DEBUG 28648 --- [fix_localhost-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-transactionTestPrefix_localhost-0, transactionalId=transactionTestPrefix_localhost-0] Transition from state COMMITTING_TRANSACTION to error state ABORTABLE_ERROR

org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.


如果我没有处理ABORTABLE_ERROR(AfterRollbackProcessor)或DLQ,我也会丢失消息。我做错了什么?
Github演示项目:https://github.com/Fonexn/kafkaMaxAgeTesting
谢谢

解决方案

以这种方式将maxAge设置为每个活页夹,例如:

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
    KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka1", MessageChannel.class);
    DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = (DefaultKafkaProducerFactory<byte[], byte[]>) kafka.getTransactionalProducerFactory();

    producerFactory.setMaxAge(Duration.ofSeconds(60));

    return new KafkaTransactionManager(producerFactory);
}

ewm0tg9j

ewm0tg9j1#

maxAge不会刷新元数据。当从该高速缓存中检索到事务生产者时,如果maxAge已超过,则关闭生产者,并检查该高速缓存中的下一个生产者。此操作将继续,直到找到年轻的生产者或该高速缓存为空。如果该高速缓存中的所有生产者都太旧,则创建新的生产者。
参见createTransactionalProducer()expire()

相关问题