如何在kafka avro消费端丢弃avro数据的默认值字段?

zphenhs4  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(228)

我在schema注册表中用10个字段定义了schema。使用confluent-3.3.0,kafka 0.10

{"schema": "{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}"}

没有默认值的字段:username,uid,默认值的字段:company,age,number,firstcompany,lastcompany,mobilenumber,email,birthyear。
Kafka生产商代码如下。我只传递2个字段用户名,uid到Kafka生产者,即avroproducer

public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        //props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Schema.Parser parser = new Schema.Parser();
        // I will get below schema string from SCHEMA REGISTRY
        Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}");

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        GenericRecord record = new GenericData.Record(schema);
        record.put("uID", "06080000");
        record.put("userName", "User data10");

        ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
        producer.send(recordData);

        System.out.println("Message Sent");
    }

消费者代码是

public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
    ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
    for (ConsumerRecord<String, GenericRecord> rec : recs) {
        System.out.printf("{AvroConsumer}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
    }
}
}

在运行上述代码时,我在producer端得到以下异常。

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string in field location of http
    at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:145)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:92)
    at kafka.CTKafkaAvroSerializer.serialize(CTKafkaAvroSerializer.java:12)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
    at kafka.AvroProducerWithJSON.main(AvroProducerWithJSON.java:64)
Caused by: java.lang.NullPointerException
    at org.apache.avro.io.Encoder.writeString(Encoder.java:121)
    at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:267)

为了克服上述异常,我用schema本身的值填充了默认字段。公司,年龄,号码,firstcompany,lastcompany,mobilenumber,电子邮件,制作人结束时的生日。现在我在消费端得到如下输出。

{AvroConsumer}: Recieved [key= ip, value= {"userName": "User data10", "uID": "06080000", "company": "ABC", "age": 0, "number": 0,"firstCompany": "","lastCompany": "","mobileNumber": 0,"email": "","birthYear": 1980}]

所以我得到了所有其他(意外)字段的默认值。我想避免这样。我希望我的输出应该只包含那些我打算传递的字段(在本例中是username,uid)并放弃所有默认值字段。
任何关于这一点的指示都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题