抱歉,这个问题太笼统了,但是有人提供了一些关于如何使用kafka embedded执行生产者和消费者测试的教程或指南。我试过几个,但是有几个版本的依赖关系,没有一个真正起作用=/我用的是Spring Cloud溪Kafka。
j91ykkif1#
我们通常建议在测试中使用测试绑定器,但如果您想使用嵌入式kafka服务器,可以这样做。。。把这个加到你的pom里。。。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
测试应用程序。。。
@SpringBootApplication @EnableBinding(Processor.class) public class So43330544Application { public static void main(String[] args) { SpringApplication.run(So43330544Application.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public byte[] handle(byte[] in){ return new String(in).toUpperCase().getBytes(); } }
应用程序属性。。。
spring.cloud.stream.bindings.output.destination=so0544out spring.cloud.stream.bindings.input.destination=so0544in spring.cloud.stream.bindings.output.producer.headerMode=raw spring.cloud.stream.bindings.input.consumer.headerMode=raw spring.cloud.stream.bindings.input.group=so0544
测试用例。。。
@RunWith(SpringRunner.class) @SpringBootTest public class So43330544ApplicationTests { @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1); @Autowired private KafkaTemplate<byte[], byte[]> template; @Autowired private KafkaProperties properties; @BeforeClass public static void setup() { System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); } @Test public void testSendReceive() { template.send("so0544in", "foo".getBytes()); Map<String, Object> configs = properties.buildConsumerProperties(); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs); Consumer<byte[], byte[]> consumer = cf.createConsumer(); consumer.subscribe(Collections.singleton("so0544out")); ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000); consumer.commitSync(); assertThat(records.count()).isEqualTo(1); assertThat(new String(records.iterator().next().value())).isEqualTo("FOO"); } }
1条答案
按热度按时间j91ykkif1#
我们通常建议在测试中使用测试绑定器,但如果您想使用嵌入式kafka服务器,可以这样做。。。
把这个加到你的pom里。。。
测试应用程序。。。
应用程序属性。。。
测试用例。。。