无法从对象生成avro通用记录

b4lqfgs4  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(294)

我试图发送avro记录Kafka主题使用Kafka生产者。我有一个用户类,我正在发送该类的对象。下面的代码可以正常工作,如果我使用 avroRecord.put(); 设置每个属性。但是我想要的是从一个对象创建一个通用记录,而不使用avrorecord.put();对于每个属性。
用户类

public class User {
    int id;

    String name;

    public User(int id, String name) {

        super();

        this.id = id;

        this.name = name;

    }

    public int getId() {

        return id;

    }

    public void setId(int id) {

        this.id = id;

    }

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

}

发件人类

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericData;

import org.apache.avro.generic.GenericDatumWriter;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.io.DatumWriter;

import org.apache.avro.io.Encoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.reflect.ReflectData;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.util.Properties;

import vo.User;

public class Sender {

    public static void main(String[] args) {

        User user = new User(10,"testName");

        Schema schema = ReflectData.get().getSchema(user.getClass());

        GenericRecord avroRecord = new GenericData.Record(schema);

        //working fine

        /*avroRecord.put("id", user.getId());

        avroRecord.put("name", user.getName());*/

        //not working

        DatumWriter<Object> datumWriter = new GenericDatumWriter<Object>(schema);

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

        try {

            datumWriter.write(user, encoder);

            encoder.flush();

        } catch (IOException e1) {

            e1.printStackTrace();

        }

        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("avrotesttopic1",avroRecord);

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        props.put("schema.registry.url", "http://127.0.0.1:8081");

        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

        try {

            producer.send(record);

            producer.flush();

        } catch (Exception e) {

            e.printStackTrace();

        }

        producer.close();

    }

}

如何将此对象作为avro发布到我的Kafka主题中?
我已经参考了以下链接
https://github.com/akmalmuqeeth/confluent-kafka-spring-demo/blob/master/src/main/java/confluentproducerapp.java
https://findusages.com/search/org.apache.avro.io.datumwriter/write$2?补偿=23
https://www.ctheu.com/2017/03/02/serializing-data-efficiently-with-apache-avro-and-dealing-with-a-schema-registry/
谢谢您。

hrysbysz

hrysbysz1#

你可以用它来完成你正在尝试的事情 ReflectDatumWriter ,唯一的限制是要读取所需的数据 ReflectDatumReader 它将期望和空构造函数作为类的一部分。以下代码正在运行(没有kafka,至少序列化/反序列化)

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class Test {
    public static void main(String[] args) throws IOException {

        User user = new User(10, "testName");
        Schema schema = ReflectData.get().getSchema(user.getClass());
        GenericRecord avroRecord = new GenericData.Record(schema);

        ReflectDatumWriter<User> datumWriter = new ReflectDatumWriter<User>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
        datumWriter.write(user,  encoder);
        encoder.flush();

        ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(schema);
        User after =  (User)reader.read(null, DecoderFactory.get().binaryDecoder(outputStream.toByteArray(), null));
        System.out.println(after.getId());
        System.out.println(after.getName());
    }

    public static class User {
        int id;
        String name;

        public User(){

        }

        public User(int id, String name) {
            super();
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

}

我建议一定要将schema registry与avroserializer/avrodeserializer一起使用,或者在最坏的情况下使用基于schema的编译类,以确保kafka中主题级的兼容性,并且一定要比反射解决方案执行得更好。
编辑:
如果你想用 KafkaAvroSerializer / KafkaAvroDeserializer 必须为序列化提供支持的对象(可以在此处找到列表)。如您所见,它需要一个基元类型或 IndexedRecord ,这意味着您需要提供一个已编译的avro类或 GenericRecord 要序列化/反序列化,无法直接使用kafkaavro serde实现的pojo对象。
另一个选项是实现您自己的序列化器/反序列化器来处理在我的示例中序列化/序列化的字节数组。

jtjikinw

jtjikinw2#

您应该首先创建avro模式,并使用 avro-tools 或者 avro-maven-plugin . 工作示例可以在这里找到

相关问题