mrunit在hbase结果对象中传递值

vtwuwzda  于 2021-06-09  发布在  Hbase
关注(0)|答案(2)|浏览(365)

我正在用mrunit测试我的Map。我将键和值列表作为输入从测试类传递给Map器。问题是:

String key=1234_abc;
ArrayList<KeyValue> list = new ArrayList<KeyValue>();
KeyValue k1 = new KeyValue(Bytes.toBytes(key),"cf".getBytes(), "Val1".getBytes(),Bytes.toBytes("abc.com"));
KeyValue k2 = new KeyValue(Bytes.toBytes(key), "cf".getBytes(), "Val2".getBytes(),Bytes.toBytes("165"));
Result result = new Result(list);
mapDriver.withInput(key, result);

问题是在结果对象中只保留第一个键值。其他的被存储为空。

r6vfmomb

r6vfmomb1#

我自己在这个问题上经历了6个小时的痛苦,终于发现了问题所在。它似乎是org.apache.hadoop.hbase.client.result类中的一个bug,至少对于我正在使用的hbase版本(0.94.18)。

// The below line of code was failing for me when running locally under MRUnit
// but it seemed to succeed when running in production on my cluster.
// org.apache.hadoop.hbase.client.Result result passed in to this method.
Bytes.toString(result.getValue(Constants.CF1, Constants.REG_STATUS_FLAG_BYTES));

result.getvalue()调用getcolumnlatest(),其中包含对binarysearch()的调用。binarysearch()方法似乎有问题,几乎总是返回错误的索引。getcolumnlatest()通过确保族和限定符匹配,双重检查是否确实找到了正确的keyvalue。它们通常不是匹配项,并且返回null。
我最终重新实现了getvalue()方法和它使用的3个方法,然后在单元测试中切换到功能正确的实现。可能有更好的方法来实现这一点,但为时已晚,这就是我想到的(它确实解决了问题):

// Usage: Pass the Result into the newly created getValue() method, rather than
// calling getValue() on the Result object.
Bytes.toString(getValue(result, Constants.CF1, Constants.REG_STATUS_FLAG_BYTES));

// Reimplemented Methods:
private byte[] getValue(Result result, byte [] family, byte [] qualifier) {
  KeyValue kv = getColumnLatest(result, family, qualifier);
  if (kv == null) {
    return null;
  }
  return kv.getValue();
}

private KeyValue getColumnLatest(Result result,  byte[] family, byte[] qualifier) {    
  KeyValue [] kvs = result.raw(); // side effect possibly.
  if (kvs == null || kvs.length == 0) {
    return null;
  }
  //int pos = binarySearch(kvs, family, qualifier);
  int pos = linearSearch(kvs, family, qualifier);
  if (pos == -1) {
    return null;
  }
  KeyValue kv = kvs[pos];
  if (kv.matchingColumn(family, qualifier)) {
    return kv;
  }
  return null;
}

private int linearSearch(final KeyValue [] kvs, final byte [] family,
  final byte [] qualifier) {

  int pos = -1;
  int index = 0;
  for (KeyValue kv : kvs) {
    if (byteArraysEqual(family, kv.getFamily()) && byteArraysEqual(qualifier, kv.getQualifier())) {
      pos = index;
      break;
    }
    index++;
  }
  return pos;
}

private boolean byteArraysEqual(final byte[] ba1, final byte[] ba2) {    
  if (ba1 == null || ba2 == null) {
    return false;
  }

  if (ba1.length != ba2.length) {
    return false;
  }

  for (int i = 0; i < ba1.length; i++) {
    if (ba1[i] != ba2[i]) {
      return false;
    }
  }

  return true;
}
ipakzgxi

ipakzgxi2#

问题是hbase按字典顺序存储列。结果(keyvalue[]kvs)或结果(list kvs)构造函数的预期顺序相同。
这就是解决办法!

TreeSet<KeyValue> set = new TreeSet<KeyValue>(KeyValue.COMPARATOR);

byte[] row = Bytes.toBytes("row01");
byte[] cf = Bytes.toBytes("cf");
set.add(new KeyValue(row, cf, "cone".getBytes(), Bytes.toBytes("row01_cone_one")));
set.add(new KeyValue(row, cf, "ctwo".getBytes(), Bytes.toBytes("row01_ctwo_two")));
set.add(new KeyValue(row, cf, "cthree".getBytes(), Bytes.toBytes("row01_cthree_three")));
set.add(new KeyValue(row, cf, "cfour".getBytes(), Bytes.toBytes("row01_cfour_four")));
set.add(new KeyValue(row, cf, "cfive".getBytes(), Bytes.toBytes("row01_cfive_five")));
set.add(new KeyValue(row, cf, "csix".getBytes(), Bytes.toBytes("row01_csix_six")));

KeyValue[] kvs = new KeyValue[set.size()];
set.toArray(kvs);

Result result = new Result(kvs);
mapDriver.withInput(key, result);

希望这会有帮助!

相关问题