我正在使用kafka10并从db2cdc接收其中的记录。kafka10使用合流模式注册表来存储db2表模式,并将记录作为avro发送 Array[Byte]
. 我想把这些记录存到 Hbase
(假设为原始hbase),然后使用hive对这些新记录运行一些转换(如删除列、聚合等),并将转换后的记录再次存储到hbase(假设为一致的hbase)。我尝试了两种方法,两种方法都给了我一些问题。记录的长度很大,约有500列(尽管只需要10%的列),而且每条记录的大小约为10kb。
1) 我试着把这些记录反序列化成 Array[Byte]
然后使用 streamBulkPut
方法将其插入hbase。
反序列化程序代码:
def toRecord(buffer: Array[Byte]): Array[Byte] = {
var schemaRegistry: SchemaRegistryClient = null
schemaRegistry= new CachedSchemaRegistryClient(url, 10)
val bb = ByteBuffer.wrap(buffer)
bb.get() // consume MAGIC_BYTE
val schemaId = bb.getInt // consume schemaId //println(schemaId.toString)
val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry //println(schema)
val reader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(buffer, bb.position(), bb.remaining(), null)
val writer = new GenericDatumWriter[GenericRecord](schema)
val baos = new ByteArrayOutputStream
val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos)
writer.write( reader.read(null, decoder), jsonEncoder) //reader.read(null, decoder): returns Generic record
jsonEncoder.flush
baos.toByteArray
}
hbase批量输出代码:
val messages = KafkaUtils.createDirectStream[Object,Array[Byte],KafkaAvroDecoder,DefaultDecoder](ssc, kafkaParams, topicSet)
val hconf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(ssc.sparkContext, hconf)
val tableName = "your_table"
var rowKeyArray: Array[String] = null
hbaseContext.streamBulkPut(messages,TableName.valueOf(tableName),putFunction)
def putFunction(avroRecord:Tuple2[Object,Array[Byte]]):Put = {
implicit val formats = DefaultFormats
val recordKey = getKeyString(parse(avroRecord._1.toString.mkString).extract[Map[String,String]].values.mkString)
var put = new Put(Bytes.toBytes(recordKey))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("row"), AvroDeserializer.toRecord(avroRecord._2))
put
}
def getKeyString(keystr:String):String = {
(Math.abs(keystr map (_.hashCode) reduceLeft( 31 * _ + _) ) % 10 + 48).toChar + "_" + keystr.trim
}
现在这种方法可行,但是插入速度非常慢。我每分钟的处理量约为5k条记录。计划是一旦记录在原始hbase中,我将使用hive读取并分解json来运行转换。
2) 不是在存储到原始hbase时重新序列化记录,而是在从raw->conformed hbase加载时进行序列化(我可以在这里管理慢度,因为数据已经在我身边了,即从kafka出来)。因此,我尝试将avro记录存储到hbase中,它运行得非常快,我能够在2分钟内插入150万条记录。下面是代码:
hbaseContext.streamBulkPut(messages,TableName.valueOf(tableName),putFunction)
def putFunction(avroRecord:Tuple2[Object,Array[Byte]]):Put = {
implicit val formats = DefaultFormats
val recordKey = parse(avroRecord._1.toString.mkString).extract[Map[String,String]]
var put = new Put(Bytes.toBytes(getKeyString(recordKey.values.mkString)))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("row"), avroRecord._2)
put
}
这种方法的问题是hive无法从hbase读取avro记录,并且我无法过滤记录/在其上运行任何逻辑。
我将感谢任何帮助或资源,我可以遵循,以提高性能。如果相应的问题得到解决,任何方法都会对我有用。谢谢
暂无答案!
目前还没有任何答案,快来回答吧!