我试图检查kafka和db call(postgres)之间的事务状态,如果db调用失败,那么kafka不应该将特定消息发送到kafka主题。
我试图通过springboot应用程序中的@transactional注解来实现这一点。
但是,Kafka正在向这个主题发送信息,即使db调用失败了。从控制台日志中,我可以看到以下流程:
1) Kafka的过渡状态从 INITIALIZING
至 READY
, READY
至 IN_TRANSACTION
至 ABORTING_TRANSACTION
,已将生成请求发送到分区
2) 数据库调用失败(sql错误: 0, SQLState: 22001
)
3) Kafka从国家过渡 IN_TRANSACTION
至 ABORTING_TRANSACTION
所以我相信,当我们给予@transactional时,Kafka应该只在Kafka国家从 IN_TRANSACTION
至 COMMITTING_TRANSACTION.
```
@Service
public class TestService {
@Autowired
private TestDataService dataService;
@Autowired
private KafkaTemplate<Integer, Test> kafkaTemplate;
@Transactional
public Test send(Test entity) {
kafkaTemplate.sendDefault(entity);
dataService.save(entity);
return entity;
}
}
控制台日志:
2019-10-30 09:21:52.165 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Starting the Kafka producer
2019-10-30 09:21:52.209 INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Instantiated a transactional producer.
2019-10-30 09:21:52.209 INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2019-10-30 09:21:52.209 INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
2019-10-30 09:21:52.209 INFO 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Overriding the default acks to all since idempotence is enabled.
2019-10-30 09:21:52.281 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Starting Kafka producer I/O thread.
2019-10-30 09:21:52.286 INFO 8560 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.11.0.0
2019-10-30 09:21:52.288 INFO 8560 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : cb8625948210849f
2019-10-30 09:21:52.290 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Kafka producer started
2019-10-30 09:21:52.292 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Transition from state UNINITIALIZED to INITIALIZING
2019-10-30 09:21:52.292 INFO 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] ProducerId set to -1 with epoch -1
2019-10-30 09:21:52.296 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000)
2019-10-30 09:21:52.297 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000) dequeued for sending
2019-10-30 09:21:52.298 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION)
2019-10-30 09:21:52.299 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000)
2019-10-30 09:21:52.399 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION) dequeued for sending
2019-10-30 09:21:52.553 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [TransactionalId my.transaction.id.0] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
2019-10-30 09:21:52.561 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Received transactional response FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)) for request (type=FindCoordinatorRequest, coordinatorKey=my.transaction.id.0, coordinatorType=TRANSACTION)
2019-10-30 09:21:52.561 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000) dequeued for sending
2019-10-30 09:21:52.667 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [TransactionalId my.transaction.id.0] Sending transactional request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000) to node LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)
2019-10-30 09:21:52.672 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Received transactional response InitProducerIdResponse(error=NONE, producerId=3000, producerEpoch=13, throttleTimeMs=0) for request (type=InitProducerIdRequest, transactionalId=my.transaction.id.0, transactionTimeoutMs=60000)
2019-10-30 09:21:52.673 INFO 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] ProducerId set to 3000 with epoch 13
2019-10-30 09:21:52.673 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Transition from state INITIALIZING to READY
2019-10-30 09:21:52.673 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Transition from state READY to IN_TRANSACTION
2019-10-30 09:21:52.674 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@758f6aa7] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@126f8f24] to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.677 TRACE 8560 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=topic5, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=com.example.demo.entities.Test@347a5bd2, timestamp=null)
2019-10-30 09:21:52.686 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Requesting metadata update for topic topic5.
2019-10-30 09:21:52.710 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Sending record ProducerRecord(topic=topic5, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=com.example.demo.entities.Test@347a5bd2, timestamp=null) with callback org.springframework.kafka.core.KafkaTemplate$1@3028a6bd to topic topic5 partition 0
2019-10-30 09:21:52.710 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Begin adding new partition topic5-0 to transaction
2019-10-30 09:21:52.710 TRACE 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.RecordAccumulator : Allocating a new 16384 byte message buffer for topic topic5 partition 0
2019-10-30 09:21:52.720 TRACE 8560 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : Waking up the sender since topic topic5 partition 0 is either full or getting a new batch
2019-10-30 09:21:52.720 TRACE 8560 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=topic5, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=com.example.demo.entities.Test@347a5bd2, timestamp=null)
After Kafka call
2019-10-30 09:21:52.723 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0])
2019-10-30 09:21:52.723 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0]) dequeued for sending
2019-10-30 09:21:52.724 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [TransactionalId my.transaction.id.0] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0]) to node LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)
2019-10-30 09:21:52.728 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@4a455630] for key [public abstract java.lang.Object org.springframework.data.repository.CrudRepository.save(java.lang.Object)] to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.730 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Received transactional response AddPartitionsToTxnResponse(errors={topic5-0=NONE}, throttleTimeMs=0) for request (type=AddPartitionsToTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, partitions=[topic5-0])
2019-10-30 09:21:52.731 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Successfully added partitions [topic5-0] to transaction
2019-10-30 09:21:52.731 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.RecordAccumulator : Assigning sequence number 0 from producer (producerId=3000, epoch=13) to dequeued batch from partition topic5-0 bound for LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null).
2019-10-30 09:21:52.732 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@169784b8] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@e7b265e] bound to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.732 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@7903817d] for key [HikariDataSource (testdb)] bound to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.732 TRACE 8560 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2019-10-30 09:21:52.736 TRACE 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Nodes with data ready to send: [LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)]
2019-10-30 09:21:52.740 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@169784b8] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@e7b265e] bound to thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.741 TRACE 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Sent produce request to 0: (type=ProduceRequest, magic=2, acks=-1, timeout=30000, partitionRecords=({topic5-0=[(record=DefaultRecord(offset=0, timestamp=1572427312710, key=0 bytes, value=38 bytes))]}), transactionalId='my.transaction.id.0'
2019-10-30 09:21:52.752 TRACE 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Received produce response from node 0 with correlation id 6
2019-10-30 09:21:52.752 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Incremented sequence number for topic-partition topic5-0 to 1
2019-10-30 09:21:52.752 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.ProducerBatch : Successfully produced messages to topic5-0 with base offset 48.
2019-10-30 09:21:52.801 TRACE 8560 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2019-10-30 09:21:52.801 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@4a455630] for key [public abstract java.lang.Object org.springframework.data.repository.CrudRepository.save(java.lang.Object)] from thread [http-nio-8080-exec-1]
After DB call
2019-10-30 09:21:52.801 TRACE 8560 --- [nio-8080-exec-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.demo.service.TestService.send]
2019-10-30 09:21:52.802 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@758f6aa7] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@126f8f24] from thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.830 WARN 8560 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: 22001
2019-10-30 09:21:52.830 ERROR 8560 --- [nio-8080-exec-1] o.h.engine.jdbc.spi.SqlExceptionHelper : ERROR: value too long for type character varying(10)
2019-10-30 09:21:52.831 INFO 8560 --- [nio-8080-exec-1] o.h.e.j.b.internal.AbstractBatchImpl : HHH000010: On release of batch it still contained JDBC statements
2019-10-30 09:21:52.833 ERROR 8560 --- [nio-8080-exec-1] o.h.i.ExceptionMapperStandardImpl : HHH000346: Error during managed flush [org.hibernate.exception.DataException: could not execute statement]
2019-10-30 09:21:52.834 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
2019-10-30 09:21:52.835 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Transition from state IN_TRANSACTION to ABORTING_TRANSACTION
2019-10-30 09:21:52.836 DEBUG 8560 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT)
2019-10-30 09:21:52.836 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT) dequeued for sending
2019-10-30 09:21:52.836 DEBUG 8560 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [TransactionalId my.transaction.id.0] Sending transactional request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT) to node LIN70000505.corp.capgemini.com:9092 (id: 0 rack: null)
2019-10-30 09:21:52.839 TRACE 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Received transactional response EndTxnResponse(error=NONE, throttleTimeMs=0) for request (type=EndTxnRequest, transactionalId=my.transaction.id.0, producerId=3000, producerEpoch=13, result=ABORT)
2019-10-30 09:21:52.839 DEBUG 8560 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [TransactionalId my.transaction.id.0] Transition from state ABORTING_TRANSACTION to READY
2019-10-30 09:21:52.839 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.jdbc.datasource.ConnectionHolder@7903817d] for key [HikariDataSource (testdb)] from thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.844 TRACE 8560 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.orm.jpa.EntityManagerHolder@169784b8] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@e7b265e] from thread [http-nio-8080-exec-1]
2019-10-30 09:21:52.854 ERROR 8560 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; nested exception is org.hibernate.exception.DataException: could not execute statement] with root cause
org.postgresql.util.PSQLException: ERROR: value too long for type character varying(10)
1条答案
按热度按时间7lrncoxx1#
kafka不支持事务-至少不支持涉及多个kafka集群的事务。