scala嵌入kafka和kafka流中的生产者错误

ca1c2owp  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(383)

我有一个测试,它会让一个开放生产者线程连续记录错误。

  1. [2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  2. [2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  3. [2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  4. [2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  5. [2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  6. [2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  7. [2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  8. [2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  9. [2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  10. [2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  11. [2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  12. [2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

测试是可行的,但有时会像上面那样失败。

  1. test("My test") {
  2. val topology = Application.getTopology(...)
  3. val streams = new KafkaStreams(topology,properties)
  4. withRunningKafka {
  5. createCustomTopic(eventTopic)
  6. val streamId = UUIDs.newUuid().toString
  7. logger.info(s"Creating stream with Application ID: [$streamId]")
  8. val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))
  9. try {
  10. publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
  11. // ... several more publishings
  12. Thread.sleep(publishingDelay) // Give time to initialize
  13. streams.start()
  14. Thread.sleep(deletionDelay)
  15. withConsumer[MyKey, MyEvent, Unit] { consumer =>
  16. val consumedMessages: Stream[(MyKey, MyEvent)] =
  17. consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
  18. val messages = consumedMessages.take(20).toList
  19. messages.foreach(tuple => logger.info("EVENT END: " + tuple))
  20. messages.size should be(6)
  21. // several assertions here
  22. }
  23. } finally {
  24. streams.close()
  25. }
  26. }(config)
  27. }

一个特殊性是streams应用程序在其使用的同一主题上生成删除事件。
在这个套件中有两个类似的测试。我在sbt下执行测试套件,如下所示:

  1. testOnly *MyTest

五分之四的执行会留下一个悬而未决的线程无限期地发布这些错误。他们三人一组出现,但我也不知道为什么。
我尝试在调用close()后设置延迟,但似乎没有帮助。如何避免悬挂生产者线程?

3ks5zfa0

3ks5zfa01#

在测试中,您将创建两个 KafkaStreams 示例,但仅限于 close() 一个。我认为 Producer 属于您未关闭的示例。注意,你需要打电话 KafkaStreams#close() 即使你从没打过电话 KafkaStreams#start() .

相关问题