我有一个kafka生产者的java代码,它将genericrecord作为值发送给kafka代理。我必须通过客户机(Kafka消费者)使用此消息。我已经通过schemaregistry注册了java类的模式,并在c#客户端正确地接收了该模式。但是,当我使用microsoftavro反序列化程序解析字节数组时,创建的avrorecord不包含显示消息的方法。以下是java kafka producer的代码:
public class OrderProducerAvro {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerCoordinates);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://10.250.33.228:8081");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
int numberOfMessages = 10;
Random rand = new Random();
try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props)) {
while (numberOfMessages-- > 0) {
//Order order = new Order(numberOfMessages, rand.nextInt(1000), rand.nextInt(2000), LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli());
Order order = new Order();
order.setOrderName("XYZ");
Schema schema = ReflectData.get().getSchema(order.getClass());
GenericRecord avroRecord = new GenericData.Record(schema);
schema.getFields().forEach(r -> avroRecord.put(r.name(), order.getOrderName() + rand.nextInt(2000)));
producer.send(new ProducerRecord<>("POCtopic1", UUID.randomUUID().toString(), avroRecord));
System.out.println("Sending msg: " + avroRecord);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println("Producer dies here!!!");
}
}
以下是order类的代码:
public class Order {
public String getOrderName() {
return orderName;
}
public void setOrderName(String orderName) {
this.orderName = orderName;
}
String orderName;
}
avrodeserializer的客户机代码是:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka.Serialization;
using Microsoft.Hadoop.Avro.Schema;
using Microsoft.Hadoop.Avro;
using System.Runtime.Serialization;
using Microsoft.Hadoop.Avro.Container;
using System.IO;
namespace ConsoleApplication2
{
class AvroDes : IDeserializer<AvroRecord>
{
String schema;
public AvroDes(String s)
{
this.schema = s;
}
public AvroRecord Deserialize(byte[] data)
{
if (data == null)
{
throw new ArgumentException();
}
// Console.Write(schema);
var serializer = AvroSerializer.CreateGeneric(schema);
var memStream = new MemoryStream();
memStream.Write(data, 0, data.Length);
memStream.Position = 0;
// var rootSchema = serializer.ReaderSchema as RecordSchema;
AvroRecord actual = (AvroRecord)serializer.Deserialize(memStream);
return actual;
}
}
}
order类的架构是:
{"type":"record","name":"Order","namespace":"amit.poc.order","fields":[{"name":"orderName","type":"string"}]}
我想通过如下操作获得java kafka制作人(即“xyz”)发送的名称:-
actual.orderName
请帮助我与这个或只是建议我一些其他的方式发展Kafka消费者在c#
note:- i 我正在为c#客户端使用microsoft.hadoop.avro dll
暂无答案!
目前还没有任何答案,快来回答吧!