我们正在使用transactional producers,有时我们发现自己处于超过7天没有流量的情况,这导致transactional id元数据丢失。在此时间之后的下一次写入总是会导致错误:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
字符串
目前的解决方法是重新启动Kubernetes中的所有示例。
查看文档,我看到在2.5.8版中添加了一个新的属性“maxAge”,它必须少于7天才能刷新代理端的元数据。我正在测试,没有办法让它按预期工作。
我已经建立了一个演示项目来模拟行为,除了我的一些错误,maxAge在以下时间之前不会刷新元数据:
transactional.id.expiration.ms
型
使用的版本:
- confluentinc/cp-kafka:latest - Kafka 3.4
- Sping Boot 3.1.5 JDK 17
- Kafka客户端3.4.1
- Spring Kafka 3.0.12
- Spring Stream Binder Kafka 4.0.4
事务元数据将在10秒内强制过期。
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 10000
KAFKA_PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS: 100
KAFKA_PRODUCER_ID_EXPIRATION_MS: 10000
KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS: 10000
KAFKA_TRANSACTION_REMOVE_EXPIRED_TRANSACTION_CLEANUP_INTERVAL_MS: 10000
型
将maxAge设置为7秒,期望它在10秒之前刷新,以获得额外的10秒:
@Configuration
@Slf4j
public class KafkaConfig {
@Bean
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
return producerFactory -> producerFactory.setMaxAge(Duration.ofSeconds(7));
}
}
型
按需测试控制器到生产者消息
@RestController
public class TestController {
private AtomicInteger semaphore = new AtomicInteger(0);
@Autowired
private StreamBridge streamBridge;
@GetMapping(value = "/test")
public void sequentialSimple() {
streamBridge.send("sendTestData-out-0", "testMessage_" + semaphore.getAndIncrement());
}
}
型
我希望永远不会遇到InvalidPidMappingException,但我总是看到它。
2023-11-19T13:22:57.234+01:00 DEBUG 28648 --- [fix_localhost-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-transactionTestPrefix_localhost-0, transactionalId=transactionTestPrefix_localhost-0] Transition from state COMMITTING_TRANSACTION to error state ABORTABLE_ERROR
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
型
如果我没有处理ABORTABLE_ERROR(AfterRollbackProcessor)或DLQ,我也会丢失消息。我做错了什么?
Github演示项目:https://github.com/Fonexn/kafkaMaxAgeTesting
谢谢
解决方案
以这种方式将maxAge
设置为每个活页夹,例如:
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka1", MessageChannel.class);
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = (DefaultKafkaProducerFactory<byte[], byte[]>) kafka.getTransactionalProducerFactory();
producerFactory.setMaxAge(Duration.ofSeconds(60));
return new KafkaTransactionManager(producerFactory);
}
型
1条答案
按热度按时间ewm0tg9j1#
maxAge
不会刷新元数据。当从该高速缓存中检索到事务生产者时,如果maxAge
已超过,则关闭生产者,并检查该高速缓存中的下一个生产者。此操作将继续,直到找到年轻的生产者或该高速缓存为空。如果该高速缓存中的所有生产者都太旧,则创建新的生产者。参见
createTransactionalProducer()
和expire()
。