使用flume序列化程序生成复合hbase行键

bogh5gae  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(241)

我有这样的地理信息系统数据-

'111, 2011-02-01 20:30:30, 116.50443, 40.00951'  
'111, 2011-02-01 20:30:31, 116.50443, 40.00951'  
'112, 2011-02-01 20:30:30, 116.58197, 40.06665'  
'112, 2011-02-01 20:30:31, 116.58197, 40.06665'

第一列是 driver_id 其次是 timestamp 三是 longitude &第四是 latitude .
我正在使用flume接收这种类型的数据&我的接收器是hbase(类型- AsyncHBaseSink ).
默认情况下,hbase将rowkey指定为第一列(如111)。我想创建一个复合的rowkey(比如前两列的组合111\u 2011-02-01 20:30:30)。
我试着把所需的改变 AsyncHbaseLogEventSerializer.java “但他们没有反映出来。
请建议我如何做同样的事。

x6492ojm

x6492ojm1#

复合密钥应在asynchbaseserializer中工作
下面是示例代码段。
在类级别声明 privae List<PutRequest> puts = null; ```
/**
* Method joinRowKeyContent. (with EMPTY string separation)
*
* Joiner is google guava class
* @param objArray Object...
*
* @return String
*/
public static String joinRowKeyContent(Object... objArray) {
return Joiner.on("").appendTo(new StringBuilder(), objArray).toString();
}

/**
* Method preParePutRequestForBody.
*
* @param rowKeyBytes
* @param timestamp
*/
private void preParePutRequest(final byte[] rowKeyBytes, final long timestamp) {
// Process

        LOG.debug("Processing ..." + Bytes.toString(rowKeyBytes));

    final PutRequest putreq = new PutRequest(table, rowKeyBytes, colFam, Bytes.toBytes("yourcolumn"), yourcolumnasBytearray, timestamp);
    puts.add(putreq);
}
你的get actions方法看起来像。。。

@Override
public List getActions() {
//create rowkey like this
final String rowKey = joinRowKeyContent(driver_id, timestamp, longitude , latitude);

// call prepare put requests method here 
final byte[] rowKeyBytes = Bytes.toBytes(rowKey);
            puts.clear();
 preParePutRequest(rowKeyBytes ,<timestamp>)
        return puts;
    }

相关问题