我已经编写了一个类,用于将uuid类型的对象自定义编码为要通过kafka和avro传输的字节。
为了使用这个类,我把 @AvroEncode(using=UUIDAsBytesEncoding.class)
在目标对象中的uuid变量上方(这是由apacheavro反射库实现的)
我很难弄清楚如何让我的消费者自动使用自定义解码器(或者我必须进去手动解码吗?)。
以下是我的uuidasbytesencoder扩展的customencoding类:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);
// encode length of data
out.writeLong(16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
System.out.println("READING");
Long size = in.readLong();
Long leastSig = in.readLong();
Long mostSig = in.readLong();
return new UUID(mostSig, leastSig);
}
}
write方法和编码工作得很好,但是read方法在反序列化时永远不会被调用。如何在消费者中实现这一点?
注册表上的架构如下所示:
{“type”:“record”,“name”:“request”,“namespace”:“x..”,“fields”:[{“name”:“password”,“type”:“string”},{“name”:“email”,“type”:“string”},{“name”:“id”,“type”:[“null”,{“type”:“bytes”,“customencoding”:“uuidasbytesencoding”}],“default”:null}}`
如果使用者不能自动使用该信息来使用uuidasbytesencoding read方法,那么我如何在我的使用者中找到标记有该标记的数据?
我也在使用合流模式注册表。
任何帮助都将不胜感激!
1条答案
按热度按时间vql8enpb1#
最终找到了解决办法。编码不正确--内置的writebytes()方法会自动为您写入长度。
然后在consumer中,我们必须通过genericdatumwriter,写入二进制流,然后用reflectdatumreader从二进制流中读取。这将自动调用uuiasbytesencoding read()方法并反序列化uuid。
我的消费者看起来像这样(作为消费者组executor服务演练的一部分):
那么新的uuidasbytesen编码将如下所示:
这也是一个如何实现customencoding avro类的示例。avro的当前版本没有内置uuid序列化程序,因此这是解决该问题的一种方法。