java中的avro到json转换器

lqfhib0f  于 2021-09-13  发布在  Java
关注(0)|答案(1)|浏览(484)

我将kafka中的avro作为一个字符串来阅读,并尝试使用java代码将字符串avro转换为json。

  1. @KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
  2. void listener(String message, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws CacheServiceException, JsonProcessingException {
  3. //some code here
  4. byte[] data = message.getBytes(); //This seems to be the issue
  5. avroToJson(schema,data)
  6. }
  7. //Code to convert avro to json
  8. public String avroToJson(Schema schema, byte[] avroBinary) throws IOException {
  9. DatumReader<Object> datumReader = new GenericDatumReader<>(schema);
  10. Decoder decoder = DecoderFactory.get().binaryDecoder(avroBinary, null);
  11. Object avroDatum = datumReader.read(null, decoder);
  12. System.out.println("Initiating loop");
  13. try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
  14. DatumWriter<Object> writer = new GenericDatumWriter<>(schema);
  15. JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
  16. writer.write(avroDatum, encoder);
  17. encoder.flush();
  18. baos.flush();
  19. return new String(baos.toByteArray(), StandardCharsets.UTF_8);
  20. }
  21. }

我希望避免从Kafka以avro的形式读取数据,因为我在同一Kafka中读取不同主题、不同模式的数据。

gg58donl

gg58donl1#

avro用于根据模式验证数据。如果您不想这样做,只需删除avro并仅使用Kafka即可。从生产者端发送json数据,而不是avro序列化数据。

相关问题