如何在kafka反序列化程序中实现反序列化方法?

flvtvl50  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(476)

我正在做我的第一个ApacheKafka消费者。所以这个例子看起来不错。https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html 我在实现反序列化方法时遇到问题。假设我在customer中有address字段。我想我应该做addressdeserializer,但是如何知道addressdeserializer需要读取多少字节呢?地址有3个字符串字段,但有时其中一些字段为空。我应该传递3*8字节吗?我想这不是解决办法。另外,buffer.get(namebytes)方法对我来说似乎很不自然,因为in参数同时也是out参数,这是一种糟糕的做法。这是检索字节的正确方法还是我遗漏了什么?先谢谢你。

@Override
  public Customer deserialize(String topic, byte[] data) {

    int id;
    int nameSize;
    String name;

    try {
      if (data == null)
        return null;
      if (data.length < 8)
        throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");

      ByteBuffer buffer = ByteBuffer.wrap(data);
      id = buffer.getInt();
      String nameSize = buffer.getInt();

      byte[] nameBytes = new Array[Byte](nameSize);
      buffer.get(nameBytes);
      name = new String(nameBytes, 'UTF-8');

      return new Customer(id, name); 2

    } catch (Exception e) {
      throw new SerializationException("Error when serializing Customer to byte[] " + e);
    }
  }
svmlkihl

svmlkihl1#

在这里,应该使用Jackson图书馆。添加 ObjectMapper 作为依赖项,然后:

return objectMapper.readValue(data, Customer.class);

相关问题