我正在使用KStreams和Avro创建一个应用程序。我尝试使用嵌入式Kafka进行单元测试用例,但出现以下异常:创建名为“kafkaEmbedded”的bean时出错:调用init方法失败;嵌套异常为java.lang.NoSuchMethodError:org.apache.kafka.test.TestUtils.tempDirectory()Ljava/io/File;有没有更好的方法/解决方案来使用Kafka流编写单元测试用例?
hof1towb1#
我启动一个Avro schema-registry,每个测试都有一个不同的端口。像这样:
@Test public void testKafkaWithAvro() { try { Process process = Runtime.getRuntime().exec("schema-registry-start ./schema-registry.properties"); final String TEST_URL = "http://localhost:8181"; AvroValue avroValue = new AvroValue(); avroValue.setId(1); avroValue.setField(1); // Start creating and configuring the stream processing StreamsBuilder builder = new StreamsBuilder(); myConfigFactory myConfigFactory = new myConfigFactory(); myConfigFactory.setCdcChromeImageTopic("Topic1"); myConfig myConfig = myConfigFactory.build(); // call the code to be tested myBuilder myBuilder = new myBuilder(myConfig); myBuilder.initializeStreams(builder); Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-unit-test"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()); props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()); props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_URL); this.config = new StreamsConfig(props); // Create the topology to start testing Topology topology = builder.build(); ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(this.config, topology); final Map<String, String> serdeConfig = new HashMap<>(); serdeConfig.put("schema.registry.url", TEST_URL); serdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true"); Serde<AvroKey> keyAvroSerde = new SpecificAvroSerde<>(); keyAvroSerde.configure(serdeConfig, true); // `true` for record keys final Serde<AvroValue> valueAvroSerde = new SpecificAvroSerde<>(); valueAvroSerde.configure(serdeConfig, false); // `false` for record values // Run a test with something that should pass the filter driver.process(myConfigFactory.getFirstTopic(), testKey, testValue, keyAvroSerde.serializer(), valueAvroSerde.serializer()); ProducerRecord<myKey, dbo_my> recordPassesFilter = driver.readOutput(myConfigFactory.getOutTopic(), keyAvroSerde.deserializer(), valueAvroSerde.deserializer()); ProducerRecord<OutAvroKey, OutAvroValue> recordOut = new ProducerRecord<>(myConfigFactory.getOutTopic(), null, 0L, outKey, outValue); assertEquals("The filtered output keys didn't match", recordOut.key(), recordPassesFilter.key()); assertEquals("The filtered output values didn't match", recordOut.value(), recordPassesFilter.value()); keyAvroSerde.close(); valueAvroSerde.close(); process.destroy(); } catch (Exception e) { fail(e); } }
字符串
1条答案
按热度按时间hof1towb1#
我启动一个Avro schema-registry,每个测试都有一个不同的端口。像这样:
字符串