我在schema注册表中用10个字段定义了schema。使用confluent-3.3.0,kafka 0.10
{"schema": "{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}"}
没有默认值的字段:username,uid,默认值的字段:company,age,number,firstcompany,lastcompany,mobilenumber,email,birthyear。
Kafka生产商代码如下。我只传递2个字段用户名,uid到Kafka生产者,即avroproducer
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
//props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User2\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"number\",\"type\":\"int\",\"default\":0},{\"name\":\"firstCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"lastCompany\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"mobileNumber\",\"type\":\"int\",\"default\":0},{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"birthYear\",\"type\":\"int\",\"default\":1980}]}");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("uID", "06080000");
record.put("userName", "User data10");
ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
producer.send(recordData);
System.out.println("Message Sent");
}
消费者代码是
public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
for (ConsumerRecord<String, GenericRecord> rec : recs) {
System.out.printf("{AvroConsumer}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
}
}
}
在运行上述代码时,我在producer端得到以下异常。
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string in field location of http
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:145)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:92)
at kafka.CTKafkaAvroSerializer.serialize(CTKafkaAvroSerializer.java:12)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
at kafka.AvroProducerWithJSON.main(AvroProducerWithJSON.java:64)
Caused by: java.lang.NullPointerException
at org.apache.avro.io.Encoder.writeString(Encoder.java:121)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:267)
为了克服上述异常,我用schema本身的值填充了默认字段。公司,年龄,号码,firstcompany,lastcompany,mobilenumber,电子邮件,制作人结束时的生日。现在我在消费端得到如下输出。
{AvroConsumer}: Recieved [key= ip, value= {"userName": "User data10", "uID": "06080000", "company": "ABC", "age": 0, "number": 0,"firstCompany": "","lastCompany": "","mobileNumber": 0,"email": "","birthYear": 1980}]
所以我得到了所有其他(意外)字段的默认值。我想避免这样。我希望我的输出应该只包含那些我打算传递的字段(在本例中是username,uid)并放弃所有默认值字段。
任何关于这一点的指示都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!