Flink KafkaSink需要很长时间才能启动

20jt8wwn  于 2023-05-12  发布在  Apache
关注(0)|答案(1)|浏览(295)

我有一个恼人的行为与我的Flink应用程序时,使用KafkaSink。如果我的应用程序包含一个Sink to Kafka(EXACTLY_ONCE交付),它需要很长时间才能启动,如果我删除Kafka sink(留下其他)或将其替换为打印,那么应用程序只需几秒钟即可启动。在任务管理器日志中,我看到成千上万的重复行,如下所示:

2023-04-05 14:01:25,828 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14788 with epoch 9
2023-04-05 14:01:25,828 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:25,829 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:25,932 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15843 with epoch 8
2023-04-05 14:01:25,932 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:25,933 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,035 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16840 with epoch 7
2023-04-05 14:01:26,035 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,036 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,139 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14813 with epoch 6
2023-04-05 14:01:26,139 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,140 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,244 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16858 with epoch 5
2023-04-05 14:01:26,244 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,245 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,348 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14832 with epoch 4
2023-04-05 14:01:26,348 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,349 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,451 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15886 with epoch 3
2023-04-05 14:01:26,452 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,453 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,555 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16891 with epoch 2
2023-04-05 14:01:26,555 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,556 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,659 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14860 with epoch 1
2023-04-05 14:01:26,660 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,660 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,766 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15917 with epoch 0
2023-04-05 14:01:26,767 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,767 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,870 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14713 with epoch 23
2023-04-05 14:01:26,870 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,871 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,974 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15752 with epoch 22
2023-04-05 14:01:26,974 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,975 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16751 with epoch 21
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,180 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14724 with epoch 20
2023-04-05 14:01:27,180 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,181 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,284 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15768 with epoch 19
2023-04-05 14:01:27,284 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,285 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,387 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16765 with epoch 18
2023-04-05 14:01:27,387 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,388 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,492 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14737 with epoch 17
2023-04-05 14:01:27,492 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,493 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,596 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15785 with epoch 16
2023-04-05 14:01:27,596 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,599 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,702 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16782 with epoch 15
2023-04-05 14:01:27,702 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,703 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,815 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15798 with epoch 14
2023-04-05 14:01:27,815 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,816 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,919 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14759 with epoch 13
2023-04-05 14:01:27,919 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,920 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,022 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16803 with epoch 12
2023-04-05 14:01:28,023 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,024 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,126 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15820 with epoch 11
2023-04-05 14:01:28,126 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,127 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:28,230 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14782 with epoch 10
2023-04-05 14:01:28,230 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,231 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,333 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16825 with epoch 9
2023-04-05 14:01:28,333 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,334 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,436 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15844 with epoch 8
2023-04-05 14:01:28,436 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,437 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:28,542 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14806 with epoch 7
2023-04-05 14:01:28,543 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,544 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,646 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16851 with epoch 6
2023-04-05 14:01:28,646 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,647 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,749 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15869 with epoch 5
2023-04-05 14:01:28,749 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,751 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,853 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16868 with epoch 4
2023-04-05 14:01:28,853 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,854 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,956 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15887 with epoch 3
2023-04-05 14:01:28,956 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,957 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,060 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14846 with epoch 2
2023-04-05 14:01:29,060 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,061 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16900 with epoch 1
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,268 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15918 with epoch 0
2023-04-05 14:01:29,268 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,269 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,374 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16748 with epoch 22
2023-04-05 14:01:29,374 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,375 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,478 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14721 with epoch 21
2023-04-05 14:01:29,478 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,479 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15763 with epoch 20
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,684 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16761 with epoch 19
2023-04-05 14:01:29,685 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,685 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,788 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14734 with epoch 18
2023-04-05 14:01:29,788 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,790 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,895 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15779 with epoch 17
2023-04-05 14:01:29,895 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,896 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)

重复20分钟甚至更长时间
我试着删除Flume和应用程序启动在几秒钟内,但我失去了输出.我还试着将接收器交付保证更改为AT_LEAST_ONCE,它似乎更快(~1分钟),但仍然有很多那些日志...
这是Sink配置(一些与身份验证相关的属性被隐藏,并与appConfiguration对象一起加载)

KafkaSink deviceDaySink = KafkaSink.<DeviceDayTimeTuple>builder()
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(appConfiguration.getString("device-day")
                        .setValueSerializationSchema(new SerializeJsonDeviceDayTime())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setKafkaProducerConfig(appConfiguration.getKafkaProps())
                .setProperty("transactional.id", "tid")
                .setProperty("client.id", "cid")
                .setProperty("transaction.timeout.ms", "30000")
                .build();

编辑以获取更多信息(4月12日):至于现在我们每30秒设置一次检查点,主题已经被同一个应用程序写过了,但是如果我们重新创建主题,问题也是一样的。通过Kafka的CLI脚本kafka-transactions.sh,我们注意到flink应用在启动时打开了很多空事务:

kafka-sink-0-3394   3           18862       Empty               
kafka-sink-0-4122   3           21106       Empty               
kafka-sink-0-6148   3           21793       Empty               
kafka-sink-0-4366   3           21189       Empty               
kafka-sink-0-2584   3           18585       Empty               
kafka-sink-0-2340   3           18501       Empty 
kafka-sink-0-30     3           16458       Empty
kafka-sink-0-4816   3           21341       Empty 
kafka-sink-0-308    3           16553       Empty
kafka-sink-0-2872   3           18682       Empty 
kafka-sink-0-4654   3           21288       Empty 
kafka-sink-0-3600   3           18931       Empty               
kafka-sink-0-4898   3           21369       Empty 
kafka-sink-0-146    3           16498       Empty
kafka-sink-0-5626   3           21616       Empty               
kafka-sink-0-3844   3           21014       Empty  
kafka-sink-0-4492   3           21231       Empty 
kafka-sink-0-272    3           16541       Empty 
kafka-sink-0-5220   3           21478       Empty 
kafka-sink-0-5464   3           21561       Empty  
kafka-sink-0-3682   3           18958       Empty

这些只是全部的一部分,transactionalId和producerID都与日志文件中的相匹配。

von4xj4u

von4xj4u1#

我们遇到了同样的问题,并注意到它来自不优雅的关机。例如,当您使用intellij在本地运行应用程序并按下停止按钮时,将不会调用close方法。不幸的是,优雅关闭仅在运行模式下可用,而在调试模式下不可用。

相关问题