在围棋中使用kafka avro信息

zazmityj  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(328)

我正在尝试使用avro格式的kafka消息,但是我无法在go中将avro中的消息解码为json。
我使用的是汇合平台(3.0.1)。例如,我生成如下avro消息:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}

现在我使用go kafka图书馆的信息:sarama。纯文本信息工作正常。avro信息必须解码。我发现了不同的libs:github.com/linkedin/goavro,github.com/elodina/go-avro
但是在解码之后,我得到了一个没有值的json(两个libs):

{"f1":""}

目标:

avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))

转到avro:

schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())

msg=sarama.consumermessage消息

vlju58qv

vlju58qv1#

刚刚发现(通过比较二进制avro消息),我必须删除消息字节数组的前5个元素-现在一切正常了:)

message = msg.Value[5:]

也许有人能解释为什么

v9tzhpje

v9tzhpje2#

第一个字节是魔法字节(0)。以下4个字节是avro模式id
只有在使用合流模式注册表时,它才真正有用。

相关问题