我正试图为一个定制流处理器编写单元测试,却被序列化测试所需发送的消息所困扰。我以Kafka为例:https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html . 我对流中的自定义类(自动生成的avro类)使用specificavroserde,但是我不能在测试中使用mockschemaregistryclient()配置它,我只能指向sr的url。
Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
Map<String, String> valueSerdeConfig = new HashMap<>();
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
valueSerde.configure(valueSerdeConfig, false);
ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());
使用kafkaavroserializer,我可以这样初始化它:
KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
但消费者唱片厂不会把Kafka夫罗塞利泽作为论据。
有没有别的办法或者我不知道的方法?
谢谢你的帮助。
2条答案
按热度按时间qybjjes11#
我正在为所有avro定义生成java模型:
然后我只在模拟Kafka集群中生成一些消息
就这样。在拓扑中,我使用基于avro定义生成的java模型(类)。
56lgkhnf2#
谢谢你提出的解决方案,但我今天确实找到了一个。serdes由序列化程序和反序列化程序组成:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/serdes.html#serdefrom-org.apache.kafka.common.serialization.serializer-org.apache.kafka.common.serialization.deserializer-因此我用kafkaavroserializer和kafkaavrodeserializer构建serde,如下所示:
实现序列化程序的每个类都可以是serde的一部分。