如何使用Avro Serializer和Avro Serializer为KafkaStreams编写Junit测试用例

xt0899hw  于 2024-01-08  发布在  Kafka
关注(0)|答案(1)|浏览(229)

我正在使用KStreams和Avro创建一个应用程序。我尝试使用嵌入式Kafka进行单元测试用例,但出现以下异常:
创建名为“kafkaEmbedded”的bean时出错:调用init方法失败;嵌套异常为java.lang.NoSuchMethodError:org.apache.kafka.test.TestUtils.tempDirectory()Ljava/io/File;
有没有更好的方法/解决方案来使用Kafka流编写单元测试用例?

hof1towb

hof1towb1#

我启动一个Avro schema-registry,每个测试都有一个不同的端口。像这样:

@Test
    public void testKafkaWithAvro() {
        try {
            Process process =
                    Runtime.getRuntime().exec("schema-registry-start ./schema-registry.properties");

            final String TEST_URL = "http://localhost:8181";
            AvroValue avroValue = new AvroValue();
            avroValue.setId(1);
            avroValue.setField(1);

            // Start creating and configuring the stream processing
            StreamsBuilder builder = new StreamsBuilder();

            myConfigFactory myConfigFactory = new myConfigFactory();
            myConfigFactory.setCdcChromeImageTopic("Topic1");

            myConfig myConfig = myConfigFactory.build();

            // call the code to be tested
            myBuilder myBuilder = new myBuilder(myConfig);
            myBuilder.initializeStreams(builder);

            Properties props = new Properties();
            props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-unit-test");
            props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
            props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                    SpecificAvroSerde.class.getName());
            props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                    SpecificAvroSerde.class.getName());
            props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                    CustomTimestampExtractor.class.getName());
            props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_URL);
            this.config = new StreamsConfig(props);

            // Create the topology to start testing
            Topology topology = builder.build();
            ProcessorTopologyTestDriver driver =
                    new ProcessorTopologyTestDriver(this.config, topology);

            final Map<String, String> serdeConfig = new HashMap<>();
            serdeConfig.put("schema.registry.url", TEST_URL);
            serdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");

            Serde<AvroKey> keyAvroSerde = new SpecificAvroSerde<>();
            keyAvroSerde.configure(serdeConfig, true); // `true` for record keys
            final Serde<AvroValue> valueAvroSerde = new SpecificAvroSerde<>();
            valueAvroSerde.configure(serdeConfig, false); // `false` for record values

            // Run a test with something that should pass the filter
            driver.process(myConfigFactory.getFirstTopic(), testKey,
                    testValue, keyAvroSerde.serializer(),
                    valueAvroSerde.serializer());

            ProducerRecord<myKey, dbo_my> recordPassesFilter =
                    driver.readOutput(myConfigFactory.getOutTopic(),
                            keyAvroSerde.deserializer(),
                            valueAvroSerde.deserializer());

            ProducerRecord<OutAvroKey, OutAvroValue> recordOut =
                    new ProducerRecord<>(myConfigFactory.getOutTopic(), null, 0L,
                            outKey, outValue);

            assertEquals("The filtered output keys didn't match", recordOut.key(),
                    recordPassesFilter.key());
            assertEquals("The filtered output values didn't match", recordOut.value(),
                    recordPassesFilter.value());
            keyAvroSerde.close();
            valueAvroSerde.close();
            process.destroy();
        } catch (Exception e) {
            fail(e);            
        }
    }

字符串

相关问题