microsoft avrorecord不包含通过avro反序列化的值

o2g1uqev  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(197)

我有一个kafka生产者的java代码,它将genericrecord作为值发送给kafka代理。我必须通过客户机(Kafka消费者)使用此消息。我已经通过schemaregistry注册了java类的模式,并在c#客户端正确地接收了该模式。但是,当我使用microsoftavro反序列化程序解析字节数组时,创建的avrorecord不包含显示消息的方法。以下是java kafka producer的代码:

public class OrderProducerAvro {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerCoordinates);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.RETRIES_CONFIG, "3");
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://10.250.33.228:8081");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        int numberOfMessages = 10;
        Random rand = new Random();
        try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props)) {

            while (numberOfMessages-- > 0) {
                //Order order = new Order(numberOfMessages, rand.nextInt(1000), rand.nextInt(2000), LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli());
               Order order = new Order();
                order.setOrderName("XYZ");
                Schema schema = ReflectData.get().getSchema(order.getClass());
                GenericRecord avroRecord = new GenericData.Record(schema);

                schema.getFields().forEach(r -> avroRecord.put(r.name(), order.getOrderName() + rand.nextInt(2000)));

                producer.send(new ProducerRecord<>("POCtopic1", UUID.randomUUID().toString(), avroRecord));
                System.out.println("Sending msg: " + avroRecord);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.println("Producer dies here!!!");
    }
}

以下是order类的代码:

public class Order {
    public String getOrderName() {
        return orderName;
    }

    public void setOrderName(String orderName) {
        this.orderName = orderName;
    }

    String orderName;

}

avrodeserializer的客户机代码是:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka.Serialization;
using Microsoft.Hadoop.Avro.Schema;
using Microsoft.Hadoop.Avro;
using System.Runtime.Serialization;
using Microsoft.Hadoop.Avro.Container;
using System.IO;

namespace ConsoleApplication2
{
    class AvroDes : IDeserializer<AvroRecord>
    {
        String schema;
        public AvroDes(String s)
        {
            this.schema = s;
        }

        public AvroRecord Deserialize(byte[] data)
        {
            if (data == null)
            {
                throw new ArgumentException();
            }
          //  Console.Write(schema);

            var serializer = AvroSerializer.CreateGeneric(schema);
            var memStream = new MemoryStream();

            memStream.Write(data, 0, data.Length);
            memStream.Position = 0;

           // var rootSchema = serializer.ReaderSchema as RecordSchema;

            AvroRecord actual = (AvroRecord)serializer.Deserialize(memStream);

            return actual;
        }

    }
}

order类的架构是:

{"type":"record","name":"Order","namespace":"amit.poc.order","fields":[{"name":"orderName","type":"string"}]}

我想通过如下操作获得java kafka制作人(即“xyz”)发送的名称:-

actual.orderName

请帮助我与这个或只是建议我一些其他的方式发展Kafka消费者在c#
note:- i 我正在为c#客户端使用microsoft.hadoop.avro dll

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题