使用hbase api过滤和解码使用phoenix api存储的数据

n3schb8v  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(263)

下面是我正在编写的代码的简短描述:
我们使用apachehbase api在hbase数据库中存储数据。模式由各种数据类型的属性组成,如date、float、varchar、char等。。。
现在,我们需要访问一个元组ie的不同版本,这一行将在多年的不同时间更新,我们希望访问所有这些不同的版本。
目前apacheapi只支持
1) 定义在使用ddl创建表时表应维护的版本数
2) 创建新连接时,指定所有查询应在其上工作的表的版本号
https://community.hortonworks.com/questions/51846/query-versions-in-phoenix.html
但这太有限了,hbase api支持时间范围,并在我们需要的时间范围内设置最大版本。所以我决定使用hbase-api访问phoenix-api存储的数据。
这是我面临的问题:
1) 我想根据主键中的任何属性筛选行。我的主键由9个属性组成:
char(10),char(10),char(3),varchar(40),varchar(8),varchar(8),varchar(40),varchar(256),日期
phoenix api连接这些值并从中创建一个行键,如下所示:
$qa$f62和81ppedoid01 pgkbloombrg\x00供应商\x00prcquote\x00bb\x001\x00\x80\x00\x01ad\x5c\xfc\x00
我正在使用hbase行过滤器与子字符串匹配等于比较器来过滤行的基础上,他们的主键值。。。

  1. Filter IDFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("$qA$F62&81"));
  2. Filter CurrencyCodeFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("PGK"));
  3. ArrayList<Filter> filters = new ArrayList<>();
  4. filters.add(IDFilter);
  5. filters.add(CurrencyCodeFilter);
  6. FilterList filterList = new FilterList(Operator.MUST_PASS_ALL ,filters);
  7. scan.setMaxVersions(1);
  8. scan.setFilter(filterList);

这对于char、varchar和numbers这样的主键属性非常有效。但我不能根据日期筛选出来,这是非常必要的。
日期的问题是:
我不理解它使用的编码,例如phoenix api将日期“2018-01-30”存储为\x80\x00\x01ad\x5c\xfc\x00
我知道phoenix api将“\x00”放在varchar之后作为分隔符,但我不理解这种编码。
所以我尝试在hbase shell中运行以下命令:
hbase(main):007:0>扫描“hstp2”,{filter=>“rowfilter(=,'substring:\x80\x00\x01ad\x5c\xfc\x00')”}
我得到了正确的结果
但当我使用hbase api在java中尝试同样的方法时,没有得到任何结果:

  1. Filter DateFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("\\x80\\x00\\x01aD\\x5C\\xFC\\x00"));

当我清除日期过滤器时,我得到了这个
行筛选器(相等,\x5cx80\x5cx00\x5cx01ad\x5cx5c\x5cxfc\x5cx00)
“\”>“\x5c”的转换是问题的原因,因此我没有得到任何结果。
如何根据任何日期执行行筛选?我需要将日期转换为phoenix api存储它的格式,然后运行行过滤器吗?还是有别的办法?
这是我迄今为止测试基于不同属性过滤和解码提取数据的代码:

  1. import java.io.IOException;
  2. import java.text.DateFormat;
  3. import java.text.ParseException;
  4. import java.text.SimpleDateFormat;
  5. import java.util.ArrayList;
  6. import java.util.Date;
  7. import java.util.Locale;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.hbase.Cell;
  10. import org.apache.hadoop.hbase.HBaseConfiguration;
  11. import org.apache.hadoop.hbase.KeyValue;
  12. import org.apache.hadoop.hbase.client.Get;
  13. import org.apache.hadoop.hbase.client.HTable;
  14. import org.apache.hadoop.hbase.client.Result;
  15. import org.apache.hadoop.hbase.client.ResultScanner;
  16. import org.apache.hadoop.hbase.client.Scan;
  17. import org.apache.hadoop.hbase.filter.Filter;
  18. import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
  19. import org.apache.hadoop.hbase.filter.PrefixFilter;
  20. import org.apache.hadoop.hbase.filter.QualifierFilter;
  21. import org.apache.hadoop.hbase.filter.RegexStringComparator;
  22. import org.apache.hadoop.hbase.filter.RowFilter;
  23. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  24. import org.apache.hadoop.hbase.filter.SubstringComparator;
  25. import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
  26. import org.apache.hadoop.hbase.filter.BinaryComparator;
  27. import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
  28. import org.apache.hadoop.hbase.filter.CompareFilter;
  29. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
  30. import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
  31. import org.apache.hadoop.hbase.filter.FilterList;
  32. import org.apache.hadoop.hbase.filter.FilterList.Operator;
  33. import org.apache.phoenix.schema.PStringColumn;
  34. import org.apache.phoenix.schema.SortOrder;
  35. import org.apache.phoenix.schema.types.PDataType;
  36. import org.apache.phoenix.schema.types.PDate;
  37. import org.apache.phoenix.schema.types.PFloat;
  38. import org.apache.phoenix.schema.types.PInteger;
  39. import org.apache.phoenix.shaded.org.apache.directory.shared.kerberos.codec.types.PaDataType;
  40. import org.apache.phoenix.shaded.org.joni.Regex;
  41. import org.apache.hadoop.hbase.util.Bytes;
  42. public class HbaseVersionedPriceFetcher {
  43. public static void main(String[] args) {
  44. try {
  45. Configuration conf = HBaseConfiguration.create(new Configuration());
  46. conf.set("hbase.zookeeper.quorum", "hostName");//Private Detail
  47. conf.set("hbase.zookeeper.property.clientPort", "2181");
  48. HTable table = new HTable(conf, "HSTP2");
  49. // Filter filter = new SingleColumnValueFilter("0".getBytes(),"LAST_CHG_USR_ID".getBytes(), CompareOp.EQUAL, "AUTO:GEN:SCRIPT".getBytes());
  50. // Filter filter = new SingleColumnValueFilter("ISPH".getBytes(),"MKT_OID".getBytes(), CompareOp.EQUAL, "MARKET".getBytes());
  51. // Filter filter = new SingleColumnValueFilter("ISPH".getBytes(),"VALIDATED_PRC_TYPE".getBytes(), CompareOp.EQUAL, "MID".getBytes());
  52. Scan scan = new Scan();
  53. //Filter List
  54. Filter IDFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("qA$F62&81"));
  55. Filter CurrencyCodeFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("PGK"));
  56. ArrayList<Filter> filters = new ArrayList<>();
  57. filters.add(IDFilter);
  58. filters.add(CurrencyCodeFilter);
  59. FilterList filterList = new FilterList(Operator.MUST_PASS_ALL ,filters);
  60. scan.setMaxVersions(1);
  61. scan.setFilter(filterList);
  62. //REGEX
  63. //Filter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(".*PGK.*VENDOR.*"))
  64. //scan.addColumn("ISPH".getBytes(), "ADJST_TMS".getBytes());
  65. // long start = new Long("1529578558767");
  66. // long end = new Long("1529580854059");
  67. //
  68. // try {
  69. // scan.setTimeRange(start,end);
  70. // } catch (IOException e) {
  71. // // TODO Auto-generated catch block
  72. // e.printStackTrace();
  73. // }
  74. ResultScanner scanner = table.getScanner(scan);
  75. int count = 0;
  76. for (Result rr : scanner) {
  77. count += 1;
  78. System.out.println("Instrument "+ count);
  79. System.out.println(rr);
  80. for (KeyValue value: rr.raw()) {
  81. String qualifier = new String(value.getQualifier());
  82. System.out.print( qualifier+" : ");
  83. byte[] valByteArray = value.getValue();
  84. if(qualifier.equals("ASK_CPRC") || qualifier.equals("BID_CPRC") || qualifier.equals("MID_CPRC") || qualifier.equals("VALIDATED_CPRC")) {
  85. float decoded = PFloat.INSTANCE.getCodec().decodeFloat(valByteArray, 0, SortOrder.getDefault());
  86. System.out.println(decoded);
  87. } else if (qualifier.equals("LAST_CHG_TMS") || qualifier.equals("ADJST_TMS") ) {
  88. System.out.println(PDate.INSTANCE.toObject(valByteArray, SortOrder.getDefault()));
  89. } else if (qualifier.equals("HST_PRC_DTE_OF_NUM")) {
  90. int decoded = PInteger.INSTANCE.getCodec().decodeInt(valByteArray, 0, SortOrder.getDefault());
  91. System.out.println(decoded);
  92. } else {
  93. System.out.println(new String(valByteArray));
  94. }
  95. }
  96. }
  97. scanner.close();
  98. } catch (IOException e1) {
  99. e1.printStackTrace();
  100. }
  101. }
  102. static byte[] getBytes(String string) {
  103. return string.getBytes();
  104. }
  105. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题