kafkaproducer问题-超时后关闭millis=9223372036854775807 ms

hec6srdp  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(620)

我的Kafka制作人有点问题,我一直找不到解决办法。或者至少我觉得这是个问题。
我已经建立了一个简单的flink作业,从一个Kafka主题中读取,使用

kafkaProps.setProperty("isolation.level", "read_committed")

因为我希望在我的应用程序中只支持一次数据。
在对我从Kafka那里读到的资料做了一些充实之后,我建立了下面的制作人

FlinkKafkaProducer<ObjectNode> kafkaProducer = new FlinkKafkaProducer<ObjectNode>(
                "enrichedPlayerSessionsTest",
                new KafkaJsonSerialize("enrichedPlayerSessionsTest"),
                producerProperties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );
kafkaProducer.setWriteTimestampToKafka(true);

然后将上面的producer作为sink添加到flink作业的末尾,这样它就可以写入kafka集群。所讨论的群集是版本2.2,并且具有默认设置,但以下设置除外:

transaction.max.timeout.ms=3600000
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

现在,当我运行这个应用程序时,我得到以下消息,

13:44:40,758 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 4
13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer               - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

有时我也会看到:

13:44:43,740 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
13:44:44,136 INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 11
13:44:44,147 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 has no restore state.

现在,由于这不是一个错误,所以作业在运行时不会崩溃,并且即使有此消息,数据也会写入kafka。然而,它似乎奇怪的是,生产者超时,这种行为然后重复几次。但也许这就是使用事务性kafkaproducer时的预期行为?
用于此应用程序的所有代码位于此处:https://github.com/drthyme/flinkkafkaproducerproblem 在repo中,您还可以找到一个.gist,其中包含我的一次运行的完整应用程序日志。
感谢你对这件事的了解。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题