如何使用kafka活页夹对kstream进行单元测试?

oalqel3c  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(348)

我想对kafka流聚合进行单元测试,我完全不知道该使用哪种方法。我读过关于testsupportbinder的文章,但我认为这在我的案例中不起作用,因此我使用kafkamebedded方法。这就是我初始化嵌入式Kafka的方式。

@Before
public void setUp() throws Exception{

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

DefaultKafkaConsumerFactory<Object, LoggerMessage> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TOPIC);
}

我要测试的内容如下:

public interface Channels {
  String LOGGER_IN_STREAM = "logger-topic-in-stream";
  String LOGGER_IN = "logger-topic-in";
  String LOGGERDATAVALIDATED_OUT = "loggerDataValidated-topic-out";

  @Input(Channels.LOGGER_IN)
  SubscribableChannel processMessage();

  @Input(Channels.LOGGER_IN_STREAM)
  KStream<Object, LoggerMessage> loggerKstreamIn();

  @Output(Channels.LOGGERDATAVALIDATED_OUT)
  MessageChannel validateLoggerData();
}

我得到以下错误信息
org.springframework.beans.factory.beancreationexception:创建名为'some.domain.channels'的bean时出错:调用init方法失败;嵌套异常为java.lang.illegalstateexception:在已注册的工厂中未找到绑定目标类型org.apache.kafka.streams.kstream.kstream的工厂:channelfactory、messagesourcefactory
原因:java.lang.illegalstateexception:在已注册的工厂中找不到绑定目标类型:org.apache.kafka.streams.kstream.kstream的工厂:channelfactory、messagesourcefactory
我做错什么了?

fv2wmkja

fv2wmkja1#

我错过了将我的channels接口作为mockbean注入。我做了那件事之后,一切都如期进行。

相关问题