Kafka试验容器和 Spring Boot

e1xvtsh3  于 2023-03-02  发布在  Spring
关注(0)|答案(1)|浏览(148)

因此,我为我的 Boot 应用程序设置了一个集成测试,其中Kafka测试容器使用

KAFKA_CONTAINER = (KafkaContainer)(new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3"))).withReuse(true);

测试在生成消息时运行良好,但是使用者似乎根本不使用消息。我认为这是因为schema registry是一个模拟

spring.kafka.properties.schema.registry.url=mock://localhost

所以我决定使用下面的代码来使用消息,使用一个模拟的注册表。使用KafkaUtils。但是它似乎不起作用

public static KafkaConsumer<String, CarDTO> createEventConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkatest");
    return new KafkaConsumer<>(props);
  }

轮询代码如下:
这消费者已经订阅到正确的主题,如果我改变avro到

while (i>0) {
            ConsumerRecords<String, CarDTO> records = storeReplenOrderDTOKafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, StoreReplenOrderDTO> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                i = 0;

            }

我得到的错误如下:

Error deserializing key/value for partition test-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
...
...
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject Not Found; error code: 40401
    at app//io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:202)

有趣的是,我将下面的改为,这样我就可以得到键而不是对象

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

我不知道我需要设置什么来让这个与模拟注册表一起工作。

yqkkidmi

yqkkidmi1#

所以我设法让它工作,而不是使用mock://为Kafka注册表,而是使用wiremock存根的响应。|对于avro,一旦我这样做了我的消费者就能够消费/

相关问题