boot-kafka侦听器的合流kafka-avro反序列化程序

w7t8yxp5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(334)

是否有人通过spring“@kafkalistener”-s实现了合流kafka消息反序列化程序来使用kafka消息?

cuxqih21

cuxqih211#

下面是我的答案,我是基于“io.confluent.kafka.serializers.abstractkafkaavrodeserializer”实现的

  1. import java.io.IOException;
  2. import java.nio.ByteBuffer;
  3. import java.util.Arrays;
  4. import java.util.Map;
  5. import javax.xml.bind.DatatypeConverter;
  6. import org.apache.avro.Schema;
  7. import org.apache.avro.io.DatumReader;
  8. import org.apache.avro.io.DecoderFactory;
  9. import org.apache.avro.specific.SpecificDatumReader;
  10. import org.apache.avro.specific.SpecificRecordBase;
  11. import org.apache.kafka.common.errors.SerializationException;
  12. import org.apache.kafka.common.serialization.Deserializer;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. public class AvroConfluentDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
  16. private static final Logger LOG = LoggerFactory.getLogger(AvroConfluentDeserializer.class);
  17. protected static final byte MAGIC_BYTE = 0x0;
  18. protected static final int idSize = 4;
  19. private final DecoderFactory decoderFactory = DecoderFactory.get();
  20. protected final Class<T> targetType;
  21. public AvroConfluentDeserializer(Class<T> targetType) {
  22. this.targetType = targetType;
  23. }
  24. @Override
  25. public void close() {
  26. // No-op
  27. }
  28. @Override
  29. public void configure(Map<String, ?> arg0, boolean arg1) {
  30. // No-op
  31. }
  32. @Override
  33. public T deserialize(String topic, byte[] data) {
  34. try {
  35. T result = null;
  36. if (data != null) {
  37. LOG.info("data='{}'", DatatypeConverter.printHexBinary(data));
  38. result = (T) deserializePayload(data, targetType.newInstance().getSchema());
  39. LOG.info("deserialized data='{}'", result);
  40. }
  41. return result;
  42. } catch (Exception ex) {
  43. throw new SerializationException(
  44. "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
  45. }
  46. }
  47. protected T deserializePayload(byte[] payload, Schema schema) throws SerializationException {
  48. int id = -1;
  49. try {
  50. ByteBuffer buffer = getByteBuffer(payload);
  51. id = buffer.getInt();
  52. int length = buffer.limit() - 1 - idSize;
  53. int start = buffer.position() + buffer.arrayOffset();
  54. DatumReader<T> reader = new SpecificDatumReader<T>(schema);
  55. return reader.read(null, decoderFactory.binaryDecoder(buffer.array(), start, length, null));
  56. } catch (IOException | RuntimeException e) {
  57. throw new SerializationException("Error deserializing Avro message for id " + id, e);
  58. }
  59. }
  60. private ByteBuffer getByteBuffer(byte[] payload) {
  61. ByteBuffer buffer = ByteBuffer.wrap(payload);
  62. if (buffer.get() != MAGIC_BYTE) {
  63. throw new SerializationException("Unknown magic byte!");
  64. }
  65. return buffer;
  66. }
  67. }
展开查看全部

相关问题