我使用的是SpringKafka2.2.2.release(org.apache)。kafka:kafka-clients:jar:2.0.1)和Spring Boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务侦听器容器和一次处理
我使用事务管理器、事务id为producer、隔离为consumer.level=read\u committed配置了生产者和消费者。
@Bean(name = "producerFactory")
public ProducerFactory<String, MyObject> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"txApp");
DefaultKafkaProducerFactory<String, KafkaSerializer> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
producerFactory.setTransactionIdPrefix("tx.");
return producerFactory;
}
@Bean
public KafkaTransactionManager<?, ?> kafkaTransactionManager() {
KafkaTransactionManager<?, ?> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory());
// ...
return kafkaTransactionManager;
}
@Bean(name="appTemplate")
public KafkaTemplate<String,MyObject> kafkaTemplate(){
KafkaTemplate<String, MyObject> kafkaTemplate = new KafkaTemplate<>(
producerFactory());
return kafkaTemplate;
}
//Consumer
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
return factory;
}
//in the Consumer
@KafkaListener(topics = "myTopic", groupId = "ingest", concurrency = "4")
public void listener(@Payload MyObject message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws ExecutionException, InterruptedException {
...
// In my producer
myTemplate.executeInTransaction(t-> t.send(kafkaConfig.getTopicName(), myMessage));
我希望看到消息到达我的侦听器,但是当我执行producer时,我得到以下错误:
22-07-2019 10:21:55.283 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete request.id= request.caller= - [Consumer clientId=consumer-2, groupId=ingest] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:1657)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
1条答案
按热度按时间baubqpgj1#
查看服务器日志;很可能您没有足够的副本来支持事务(默认为3个)。如果只是测试,可以将其设置为1。
请参见代理属性
transaction.state.log.replication.factor
以及min.insync.replicas
.事务主题的复制因子(设置得更高以确保可用性)。在群集大小满足此复制因子要求之前,内部主题创建将失败。
以及
当生产者将acks设置为“all”(或“-1”)时,此配置指定必须确认写入的副本的最小数目,这样才能认为写入成功。如果不能满足此最小值,则生产者将引发异常(notenoughreplicas或notenoughreplicasafterappend)。当一起使用时,min.insync.replicas和ack允许您强制执行更大的持久性保证。典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“all”的acks生成。这将确保在大多数复制副本未收到写操作时,生产者会引发异常。