测试使用特定avroserde的kafka处理器api

e5njpo68  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(454)

我正试图为一个定制流处理器编写单元测试,却被序列化测试所需发送的消息所困扰。我以Kafka为例:https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html . 我对流中的自定义类(自动生成的avro类)使用specificavroserde,但是我不能在测试中使用mockschemaregistryclient()配置它,我只能指向sr的url。

Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
    Map<String, String> valueSerdeConfig = new HashMap<>();
    valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
    valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
    valueSerde.configure(valueSerdeConfig, false);
    ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());

使用kafkaavroserializer,我可以这样初始化它:

KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);

但消费者唱片厂不会把Kafka夫罗塞利泽作为论据。
有没有别的办法或者我不知道的方法?
谢谢你的帮助。

qybjjes1

qybjjes11#

我正在为所有avro定义生成java模型:


# !/usr/bin/env bash

if [ ! -f avro-tools-1.8.2.jar ]; then
    wget http://tux.rainside.sk/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar
    chmod +x avro-tools-1.8.2.jar
fi

java -jar avro-tools-1.8.2.jar compile schema ../avro/raw/* ../../java/

然后我只在模拟Kafka集群中生成一些消息

public abstract class ViewPageEventGenerator {

    @NotNull
    public static KeyValue<List, HashMap<String, String>> getSimpleViewPages() {
        List<KeyValue<CustomerKey, ViewPage>> inputValues = new ArrayList<>();
        HashMap<String, String> requestExpectedValuePairs = new HashMap<>();

        inputValues = Arrays.asList(
                new KeyValue<>(
                        new CustomerKey(1912, "Alan Turing"),
                        new ViewPage("Alan Turing", false,
                                Double.parseDouble(String.valueOf(System.currentTimeMillis())),
                                "https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
                        )
                ),
                new KeyValue<>(
                        new CustomerKey(1912, "Alan Turing"),
                        new ViewPage("Alan Turing", false,
                                Double.parseDouble(String.valueOf(System.currentTimeMillis() + 100)),
                                "https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
                        )
                ),
                new KeyValue<>(
                        new CustomerKey(1912, "Alan Turing"),
                        new ViewPage("Alan Turing", false,
                                Double.parseDouble(String.valueOf(System.currentTimeMillis() + 200)),
                                "https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
                        )
                )
        );
        requestExpectedValuePairs.put(
                "{project_id: 1912}",
                "{\"success\":true,\"data\":{\"count\":3}}"
        );

        return new KeyValue<>(inputValues, requestExpectedValuePairs);
    }
}

就这样。在拓扑中,我使用基于avro定义生成的java模型(类)。

56lgkhnf

56lgkhnf2#

谢谢你提出的解决方案,但我今天确实找到了一个。serdes由序列化程序和反序列化程序组成:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/serdes.html#serdefrom-org.apache.kafka.common.serialization.serializer-org.apache.kafka.common.serialization.deserializer-因此我用kafkaavroserializer和kafkaavrodeserializer构建serde,如下所示:

Serde serde = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client));

实现序列化程序的每个类都可以是serde的一部分。

相关问题