spring kafka单元测试侦听器未订阅主题

nx7onnlm  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(383)

我有一个与Kafka一起探索Spring的示例项目(在这里找到)。我有一个监听器订阅了主题我的测试主题上游,这将只是lo的消息和键,并发布相同的另一个主题我的测试主题下游。我试过这是本地Kafka(docker撰写文件在那里)和它的作品。
现在我试着用一个嵌入式Kafka服务器来编写一个测试。在测试中,我有一个嵌入式服务器正在启动(testcontext.java),它应该在测试之前启动(在测试之前重写junit)。

private static EmbeddedKafkaBroker kafka() {
    EmbeddedKafkaBroker kafkaEmbedded =
        new EmbeddedKafkaBroker(
            3,
            false,
            1,
            "my-test-topic-upstream", "my-test-topic-downstream");
    Map<String, String> brokerProperties = new HashMap<>();
    brokerProperties.put("default.replication.factor", "1");
    brokerProperties.put("offsets.topic.replication.factor", "1");
    brokerProperties.put("group.initial.rebalance.delay.ms", "3000");
    kafkaEmbedded.brokerProperties(brokerProperties);
    try {
      kafkaEmbedded.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    return kafkaEmbedded;
  }

然后我创建一个生产者(tickproducer)并发布一条消息到我希望我的听众能够使用的主题。

public TickProducer(String brokers) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    producer = new KafkaProducer<>(props);
  }

  public RecordMetadata publishTick(String brand)
          throws ExecutionException, InterruptedException {
    return publish(TOPIC, brand, Instant.now().toString());
  }

  private RecordMetadata publish(String topic, String key, String value)
      throws ExecutionException, InterruptedException {
    final RecordMetadata recordMetadata;
    recordMetadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
    producer.flush();
    return recordMetadata;
  }

我看到下面的日志消息继续记录。

11:32:35.745 [main] WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Connection to node -1 could not be established. Broker may not be available.

最终以失败告终

11:36:52.774 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

有什么建议吗?

bpzcxfmw

bpzcxfmw1#

查看信息日志 ConsumerConfig 看看他想联系什么(把它和 ProducerConfig ). 我怀疑你没有更新Spring Boot bootstrap-servers 属性指向嵌入的代理。
看到了吗

/**
 * Set the system property with this name to the list of broker addresses.
 * @param brokerListProperty the brokerListProperty to set
 * @return this broker.
 * @since 2.3
 */
public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
    this.brokerListProperty = brokerListProperty;
    return this;
}

设置为 spring.kafka.bootstrap-servers 然后用它来代替 SPRING_EMBEDDED_KAFKA_BROKERS .
顺便说一句,使用 @EmbeddedKafka 而不是自己示例化服务器。

相关问题