如何使用SpringBootEmbeddedKafka对kstream拓扑进行集成测试?

hpxqektj  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(399)

我有一个简单的springbootkstream拓扑,它将字符串从小写转换为大写。我想我的集成测试启动一个嵌入式Kafka,然后测试拓扑。我想知道是否有可能使用spring编写这样的集成测试 @EmbeddedKafka ?
我看到了几个使用 @EmbeddedKafka 简单的消费者使用 @KafkaListener 但不是任何使用kstream的。
我尝试测试以下拓扑结构,将传入的文本流从小写转换为大写。
以下是拓扑结构:

@Configuration
    public class UppercaseStream {

        private static final String LOWERCASE_TOPIC = "t.lower.case";
        private static final String UPPERCASE_TOPIC = "t.upper.case";

        @Bean
        @Qualifier("kStreamPromoToUppercase")
        public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

            KStream<String, String> sourceStream = builder
                    .stream(LOWERCASE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

            sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream..."));

            KStream<String, String> upperCaseStream = sourceStream.mapValues(text -> text.toUpperCase());

            upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));

            upperCaseStream.to(UPPERCASE_TOPIC);

            return upperCaseStream;
        }
    }

测试拓扑的单元测试是:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
    public class UpperCaseTopologyTest {
        TopologyTestDriver testDriver;

        @AfterAll
        void tearDown(){
            testDriver.close();
        }

        @Test
        @DisplayName("should transform lowercase to uppercase words")
        void shouldTransformLowercaseWords() {
            //Given
            StreamsBuilder builder = new StreamsBuilder();
            new UppercaseStream().kStreamPromoToUppercase(builder);

            Topology topology = builder.build();
            // setup test driver
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

            //Create a Topology Test Driver 
            testDriver = new TopologyTestDriver(topology, props);
            TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("t.lower.case", new Serdes.StringSerde().serializer(), new Serdes.StringSerde().serializer());
            TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("t.upper.case", new Serdes.StringSerde().deserializer(), new Serdes.StringSerde().deserializer());

            //When
            inputTopic.pipeInput("test");

            //Then
            assertThat(outputTopic.readValue()).isEqualTo("TEST");
        }
    }

我想编写一个集成测试,首先启动一个嵌入式kafka服务器,然后测试uppercasestream拓扑。
我尝试了以下方法:

@SpringBootTest
    @DirtiesContext
    @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
    class EmbeddedKafkaIntegrationTest {

        @Autowired
        public KafkaTemplate<String, String> template;
        @Autowired
        private KafkaConsumer consumer;
        private KafkaStreams kafkaStreams;

        @Value("${test.topic}")
        private String topic;

        @Autowired
        private KafkaStreamsConfiguration kafkaStreamsConfiguration;

        @Test
        public void should_transform_lowercase_to_uppercase() throws Exception {
            //Create a StreamsBuilder
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream(topic, Consumed.with(new Serdes.StringSerde(), new Serdes.StringSerde()));

            //Add a topology
            new UppercaseStream().kStreamPromoToUppercase(streamsBuilder);
            kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration.asProperties());

            kafkaStreams.start();
            template.send(topic, "test");
            consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

            assertThat(consumer.getLatch().getCount(), equalTo(0L));
            assertThat(consumer.getPayload(), containsString("TEST"));
        }

        @After
        public void tearDown() {
            if (kafkaStreams!= null) kafkaStreams.close();

        }
    }

测试没有通过Assert。我不知道如何得到kstreampromotouppercasebean。我不确定我是否在尝试遵循正确的方法。

rjee0c15

rjee0c151#

不清楚你的目标是什么 KafkaConsumer 是;大概是包裹在 Consumer<K, V> .
也许你的消费者没有 auto.offset.reset=earliest . 这是这种测试的一个常见错误,在这种测试中,存在一个竞争,消费者可能会在记录发送后开始竞争;默认值为 latest 所以你不会得到这样的记录。
该框架有许多使用嵌入式kafka代理的kafka流测试用例。
https://github.com/spring-projects/spring-kafka/tree/master/spring-kafka/src/test/java/org/springframework/kafka/streams

bvhaajcl

bvhaajcl2#

集成测试缺少一些东西。
一对 NewTopic kafka客户端管理对象需要表示输入和输出主题 @Bean public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); } 另一个是输出主题 @Bean public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); } 测试的其余部分仍然大同小异。根据@garry的建议,我使用了kafka消费者。

@SpringBootTest
    @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
    class KStreamSampleApplicationTests {

        private final KafkaProperties kafkaProperties;
        private final String inputTopic;
        private final String outputTopic;

        @Autowired
        public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) {
            this.kafkaProperties = kafkaProperties;
            this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
            this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");

        }

        @Test
        @DisplayName("should test uppercaseStream topology")        
        void shouldTestUppercaseStreamTopology() {
            //Given
            Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(
                    String.join(",", kafkaProperties.getBootstrapServers())));

            //Create a kafka producer
            Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer();

            Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            //Create a Consumer client
            Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer();
            consumer.subscribe(Collections.singleton(outputTopic));

            //When
            producer.send(new ProducerRecord<>(inputTopic, "test"));
            producer.flush();

            //Then
            assertThat(producer).isNotNull();
            //And
            ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3));
            Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic);
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();

            if (!iterator.hasNext()) Assertions.fail();

            ConsumerRecord<String, String> next = iterator.next();
            assertThat(next.value()).isEqualTo("TEST");
        }
    }

以下是完整重构解决方案的要点。

相关问题