我试图为Kafka主题提供数据。作为需求的一部分,我从数据库中提取数据并将其Map到avro模式。我知道如何解析数据,因为模式很简单,如下所示:
简单avro模式:
{
"type":"record",
"namespace":"demo",
"name":"coreloader330",
"doc":"sample avro schema file for core loader 330",
"fields":[
{ "name" : "HEALTH_PLAN", "type" : ["null", "string"]}
]
}
通用生成器java:
public GenericRecordBuilder getGenericBuilder() {
GenericRecordBuilder builder = null;
Schema.Parser parser = new Schema.Parser();
try {
Schema schema = parser.parse(new File("src/main/resources/coreloader.avsc"));
builder = new GenericRecordBuilder(schema);
} catch (IOException e) {
e.printStackTrace();
}
return builder;
将值Map到生成器:
public void produceMessage(){
//Create Producer
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
try{
while(rs.next()){
// System.out.println(rs.getString("HEALTH_PLAN"));
builder.set("HEALTH_PLAN", rs.getString("HEALTH_PLAN")); ---> mapping values to avro schema field
GenericData.Record data = builder.build();
ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, data);
producer.send(record);
producer.flush();
}
但是,如果模式很复杂,如何Map值,如何到达Map的嵌套字段?
假设我有下面这样的模式,如何遍历并将值Map到sourcesystem字段?
"type": "record",
"name": "demo",
"namespace": "example",
"fields": [
{
"name": "data",
"type": {
"type": "record",
"name": "Data",
"fields": [
{
"name": "SourceSystem",
"type": [
"null",
"string"
],
"default": null
}
]
}
}
在下面的代码中,我的Map应该是什么?
builder.set(<?>, rs.getString("source system")); ---> mapping values to avro schema field
暂无答案!
目前还没有任何答案,快来回答吧!