因此,我为我的 Boot 应用程序设置了一个集成测试,其中Kafka测试容器使用
KAFKA_CONTAINER = (KafkaContainer)(new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3"))).withReuse(true);
测试在生成消息时运行良好,但是使用者似乎根本不使用消息。我认为这是因为schema registry是一个模拟
spring.kafka.properties.schema.registry.url=mock://localhost
所以我决定使用下面的代码来使用消息,使用一个模拟的注册表。使用KafkaUtils。但是它似乎不起作用
public static KafkaConsumer<String, CarDTO> createEventConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkatest");
return new KafkaConsumer<>(props);
}
轮询代码如下:
这消费者已经订阅到正确的主题,如果我改变avro到
while (i>0) {
ConsumerRecords<String, CarDTO> records = storeReplenOrderDTOKafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, StoreReplenOrderDTO> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
i = 0;
}
我得到的错误如下:
Error deserializing key/value for partition test-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
...
...
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject Not Found; error code: 40401
at app//io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:202)
有趣的是,我将下面的改为,这样我就可以得到键而不是对象
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
我不知道我需要设置什么来让这个与模拟注册表一起工作。
1条答案
按热度按时间yqkkidmi1#
所以我设法让它工作,而不是使用mock://为Kafka注册表,而是使用wiremock存根的响应。|对于avro,一旦我这样做了我的消费者就能够消费/