kafkaavroserializer,用于序列化没有schema.registry.url的avro

2ledvvac  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(469)

我是Kafka和阿夫罗的傀儡。所以我一直在努力让生产者/消费者运作起来。到目前为止,我已经能够使用以下方法生成和使用简单的字节和字符串:producer的配置:

Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}

现在一切都很好,当我尝试序列化pojo时,问题来了。因此,我能够使用avro提供的实用程序从pojo获取avroschema。对架构进行硬编码,然后尝试创建一个通用记录,以通过kafkaproducer发送,现在将生产者设置为:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

这就是问题所在:在我使用kafkaavroserializer的那一刻,由于缺少必需的参数schema.registry.url,生产者没有出现
我仔细阅读了为什么需要这样做,这样我的消费者就能够破译制作人发给我的任何东西。但是这个模式不是已经嵌入到avromessage中了吗?如果有人能与kafkaavroserializer共享一个使用kafkaproducer的工作示例,而不必指定schema.registry.url,那就太好了
也非常感谢您提供有关模式注册表实用程序的任何见解/资源。
谢谢!

jv2fixgn

jv2fixgn1#

首先注意: KafkaAvroSerializer 不是在vanilla apache kafka中提供的-它是由confluent平台提供的(https://www.confluent.io/),作为其开放源代码组件的一部分(http://docs.confluent.io/current/platform.html#confluent-架构注册表)
快速回答:不,如果你使用 KafkaAvroSerializer ,则需要架构注册表。请参见以下示例:http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
schema registry的基本思想是,每个主题将引用一个avro模式(即,您将只能发送彼此一致的数据)。但是一个模式可以有多个版本,所以您仍然需要为每个记录标识模式)
我们不想像您暗示的那样为每个数据编写模式—通常,模式比数据大!这将浪费每次读取时解析它的时间,并且浪费资源(网络、磁盘、cpu)
相反,模式注册表示例将执行绑定 avro schema <-> int schemaId 在从注册表中获取数据(并将其缓存以备以后使用)之后,序列化程序将只在数据之前写入这个id。
所以在Kafka里面,你的记录 [<id> <bytesavro>] (出于技术原因,还有magic byte),它的开销只有5个字节(与您的模式大小相比),在读取时,您的使用者将找到对应于id的模式,并将其反序列化为avro字节。你可以找到更多的方法在汇合博士
如果您真的需要为每个记录编写模式,那么您将需要另一个序列化程序(我认为编写自己的序列化程序很容易,只是重复使用https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/abstractkafkaavroserializer.java 并移除schema registry部分,以将其替换为schema(与读取相同)。但如果您使用avro,我真的不鼓励这样做-一天之后,您将需要实现类似avro注册表的东西来管理版本控制

thtygnil

thtygnil2#

虽然选中的答案都是正确的,但是还应该提到可以禁用模式注册。
简单设置 auto.register.schemasfalse .

xpszyzbs

xpszyzbs3#

您可以创建自定义的avro serialiser,然后即使没有schema registry,您也可以生成主题的记录。检查下面的文章。
https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html
这里他们用的是Kafka模板。我试过使用

KafkaProducer<String, User> UserKafkaProducer

它工作得很好,但是如果你想使用kafkaavroserialiser,你需要给schema registryurl

pjngdqdw

pjngdqdw4#

你总是可以让你的值类实现 Serialiser<T> , Deserialiser<T> (和 Serde<T> 对于Kafka流)手动。java类通常是从avro文件生成的,所以直接编辑并不是一个好主意,但是 Package 可能很冗长,但可能是一种可行的方法。
另一种方法是调优用于java类生成的arvo生成器模板,并自动生成所有这些接口的实现。avromaven和gradle插件都支持自定义模板,所以应该很容易配置。
我创造了https://github.com/artemyarulin/avro-kafka-deserializable 这改变了模板文件和可用于生成文件的简单cli工具

s1ag04yj

s1ag04yj5#

正如其他人所指出的,kafkaavroserializer需要作为合流平台一部分的模式注册中心,而使用需要许可。
使用schema注册表的主要优点是,与为每条消息编写带有schema的二进制有效负载相比,您的有线字节将更小。
我写了一篇博文详细说明了这些优点

相关问题