kafkastream在很长一段时间后未能产生数据(超过设定的过期时间)
甚至Kafka斯特伦在记录错误信息后也死了。
例外情况如下:
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:660)
at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:493)
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:553)
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:405)
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:730)
org.apache.kafka.common.KafkaException: Unexpected error in AddOffsetsToTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id
at org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:1237)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:834)
版本:
Kafka经纪人:2.0.0
Kafka客户:1.1.1
Kafka河:1.1.1
(经纪人和制片人)选项都是默认的:
事务\u超时\u配置
事务性.id.expiration.ms
transaction.max.timeout.ms
代码:
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
StreamsBuilder builder = new StreamsBuilder();
builder.stream("from", Consumed.with(Serdes.Integer(), Serdes.String()))
.peek((key, value) -> System.out.println(value))
.to("to", Produced.with(Serdes.Integer(), Serdes.String()), (key, value, numPartitions) -> key % numPartitions));
KafkaStreams streams = new KafkaStreams(bulider.build(), properties);
stream.start();
1条答案
按热度按时间mwngjboj1#
从错误消息来看,这里似乎有几个未知问题:
在生产商内部,我们不专门处理
INVALID_PRODUCER_ID_MAPPING
在AddOffsetsToTxnHandler#handleResponse
,这是一个致命错误,将抛出kafkaexception。在流中,我们吞下了producerfencedexception,但由于1)抛出了一个致命的kafkae异常,导致它直接消失。
1)的行为是经过设计的,但我承认,作为事后考虑,它确实有一些问题:
答。一般来说,生产商对案件进行了限制,包括
INVALID_PRODUCER_ID_MAPPING
应该比上面的更好。这被称为https://cwiki.apache.org/confluence/display/kafka/kip-360%3a+improve+handling+of+unknown+producerb。txn producer应该更好地区分“致命”错误和非致命错误,后者应该在内部处理,而不是交给调用者。一个简单的想法是,除了producerfenced错误之外,我们迄今为止设计的所有其他错误都应该被视为非致命错误,因此应该在内部处理。