嵌入式kafka测试(由sbt运行)间歇性失败,并出现zookeeperserver错误

64jmpszr  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(290)

我正在使用spring-kafkamebedded测试工具编写一组测试。每个测试分别支持一个嵌入的kafka示例,生成事件并Assert生成的下游事件。
当在ide中运行时,测试总是通过的(例如intellij),但是当使用sbt运行时,测试会间歇性地失败(大约50%的时间,没有规律性)。如果测试失败,我会看到以下错误:

  1. o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes

此外,我还看到许多报告zookeeper节点丢失的信息日志,例如:

  1. o.a.z.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x164a50fe9fd0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers

这些日志不会出现在成功的测试中。当我说“我看到许多信息日志”时,我指的是许多,大约40个这样的日志,其中一些节点路径嵌套在以前报告的节点路径中。
研究表明,错误日志是无辜的,我想信息日志也是无辜的,但它们在测试失败时是独立的。
7/17更新:
K流/生产者/消费者配置:

  1. Properties properties = new Properties();
  2. properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
  3. properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
  4. properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
  5. properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
  6. properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
  7. properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
  8. properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
  9. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
  11. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
  12. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

嵌入式Kafka启动:

  1. @Rule
  2. public KafkaEmbedded kafka = new KafkaEmbedded(getNumKafkaServers(), true, getNumPartitionsPerTopic(), getTopics().keySet().toArray(new String[0]));

试验前:

  1. @Before
  2. public void before() {
  3. // build KStreams and start topology
  4. KStreamBuilder kStreamBuilder = new KStreamBuilder();
  5. buildStream(kStreamBuilder);
  6. KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
  7. kafkaStreams.start();
  8. }

试验时间:

  1. @After
  2. public void after() {
  3. kafka.destroy();
  4. FileUtils.deleteDirectory(new File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG)));
  5. }

7/17更新:
更详细地说,这是一个spring项目,每个测试都被注解如下:

  1. @SpringBootTest(classes = <this-test-class>.class)
  2. @ActiveProfiles("test")
  3. @RunWith(SpringRunner.class)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题