scala—将通用avro记录序列化为数组[byte],将模式保留在对象中

cxfofazt  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(428)

情况
我目前正在使用avro和模式存储库编写一个消费者/生产者。
据我所知,序列化这些数据的方法要么使用confluent的avro序列化程序,要么使用twitter的双射。
似乎双射看起来最直接。
所以我想用以下格式生成日期 ProducerRecord[String,Array[Byte]] ,这归结为[some string id,serialized genericord]
(注意:我使用的是通用记录,因为这个代码库必须处理从json/csv/…解析的数千个模式)
问题:
我序列化并使用avro的全部原因是,您不需要在数据本身中有模式(就像使用json/xml/…)。
但是,在检查主题中的数据时,我看到整个方案都包含在数据中。我是在做一些根本错误的事情,这是设计的,还是应该改用合流序列化程序?
代码:

def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
    val converter = new JsonAvroConverter
    val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\\\/","_").getBytes(), schema)

    genericRecord
  }
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
    //val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
    val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)

    r
  }

//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))

        producer.send(producerRecord, new Callback {...})
b91juud3

b91juud31#

如果您查看汇合的源代码,您将看到与模式存储库交互的操作顺序如下
从avro记录中获取模式,并计算其id。理想情况下,将模式发布到存储库,或者对其进行哈希处理,应该会得到一个id。
分配bytebuffer
将返回的id写入缓冲区
将avro对象值(不包括模式)作为字节写入缓冲区
把那个字节缓冲区发给Kafka
目前,双射用法将包括字节中的模式,而不是用id替换它

相关问题