我的Kafka制作人有点问题,我一直找不到解决办法。或者至少我觉得这是个问题。
我已经建立了一个简单的flink作业,从一个Kafka主题中读取,使用
kafkaProps.setProperty("isolation.level", "read_committed")
因为我希望在我的应用程序中只支持一次数据。
在对我从Kafka那里读到的资料做了一些充实之后,我建立了下面的制作人
FlinkKafkaProducer<ObjectNode> kafkaProducer = new FlinkKafkaProducer<ObjectNode>(
"enrichedPlayerSessionsTest",
new KafkaJsonSerialize("enrichedPlayerSessionsTest"),
producerProperties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
kafkaProducer.setWriteTimestampToKafka(true);
然后将上面的producer作为sink添加到flink作业的末尾,这样它就可以写入kafka集群。所讨论的群集是版本2.2,并且具有默认设置,但以下设置除外:
transaction.max.timeout.ms=3600000
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
现在,当我运行这个应用程序时,我得到以下消息,
13:44:40,758 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with epoch 4
13:44:40,759 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-6, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
有时我也会看到:
13:44:43,740 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
13:44:44,136 INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-26, transactionalId=Source: playerSession and playserSessionStarted from Kafka -> Filter out playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with epoch 11
13:44:44,147 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 has no restore state.
现在,由于这不是一个错误,所以作业在运行时不会崩溃,并且即使有此消息,数据也会写入kafka。然而,它似乎奇怪的是,生产者超时,这种行为然后重复几次。但也许这就是使用事务性kafkaproducer时的预期行为?
用于此应用程序的所有代码位于此处:https://github.com/drthyme/flinkkafkaproducerproblem 在repo中,您还可以找到一个.gist,其中包含我的一次运行的完整应用程序日志。
感谢你对这件事的了解。
暂无答案!
目前还没有任何答案,快来回答吧!