我试图发送avro记录Kafka主题使用Kafka生产者。我有一个用户类,我正在发送该类的对象。下面的代码可以正常工作,如果我使用 avroRecord.put();
设置每个属性。但是我想要的是从一个对象创建一个通用记录,而不使用avrorecord.put();对于每个属性。
用户类
public class User {
int id;
String name;
public User(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
发件人类
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
import vo.User;
public class Sender {
public static void main(String[] args) {
User user = new User(10,"testName");
Schema schema = ReflectData.get().getSchema(user.getClass());
GenericRecord avroRecord = new GenericData.Record(schema);
//working fine
/*avroRecord.put("id", user.getId());
avroRecord.put("name", user.getName());*/
//not working
DatumWriter<Object> datumWriter = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
try {
datumWriter.write(user, encoder);
encoder.flush();
} catch (IOException e1) {
e1.printStackTrace();
}
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("avrotesttopic1",avroRecord);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://127.0.0.1:8081");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
try {
producer.send(record);
producer.flush();
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
如何将此对象作为avro发布到我的Kafka主题中?
我已经参考了以下链接
https://github.com/akmalmuqeeth/confluent-kafka-spring-demo/blob/master/src/main/java/confluentproducerapp.java
https://findusages.com/search/org.apache.avro.io.datumwriter/write$2?补偿=23
https://www.ctheu.com/2017/03/02/serializing-data-efficiently-with-apache-avro-and-dealing-with-a-schema-registry/
谢谢您。
2条答案
按热度按时间hrysbysz1#
你可以用它来完成你正在尝试的事情
ReflectDatumWriter
,唯一的限制是要读取所需的数据ReflectDatumReader
它将期望和空构造函数作为类的一部分。以下代码正在运行(没有kafka,至少序列化/反序列化)我建议一定要将schema registry与avroserializer/avrodeserializer一起使用,或者在最坏的情况下使用基于schema的编译类,以确保kafka中主题级的兼容性,并且一定要比反射解决方案执行得更好。
编辑:
如果你想用
KafkaAvroSerializer
/KafkaAvroDeserializer
必须为序列化提供支持的对象(可以在此处找到列表)。如您所见,它需要一个基元类型或IndexedRecord
,这意味着您需要提供一个已编译的avro类或GenericRecord
要序列化/反序列化,无法直接使用kafkaavro serde实现的pojo对象。另一个选项是实现您自己的序列化器/反序列化器来处理在我的示例中序列化/序列化的字节数组。
jtjikinw2#
您应该首先创建avro模式,并使用
avro-tools
或者avro-maven-plugin
. 工作示例可以在这里找到