下面是我正在编写的代码的简短描述:
我们使用apachehbase api在hbase数据库中存储数据。模式由各种数据类型的属性组成,如date、float、varchar、char等。。。
现在,我们需要访问一个元组ie的不同版本,这一行将在多年的不同时间更新,我们希望访问所有这些不同的版本。
目前apacheapi只支持
1) 定义在使用ddl创建表时表应维护的版本数
2) 创建新连接时,指定所有查询应在其上工作的表的版本号
https://community.hortonworks.com/questions/51846/query-versions-in-phoenix.html
但这太有限了,hbase api支持时间范围,并在我们需要的时间范围内设置最大版本。所以我决定使用hbase-api访问phoenix-api存储的数据。
这是我面临的问题:
1) 我想根据主键中的任何属性筛选行。我的主键由9个属性组成:
char(10),char(10),char(3),varchar(40),varchar(8),varchar(8),varchar(40),varchar(256),日期
phoenix api连接这些值并从中创建一个行键,如下所示:
$qa$f62和81ppedoid01 pgkbloombrg\x00供应商\x00prcquote\x00bb\x001\x00\x80\x00\x01ad\x5c\xfc\x00
我正在使用hbase行过滤器与子字符串匹配等于比较器来过滤行的基础上,他们的主键值。。。
Filter IDFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("$qA$F62&81"));
Filter CurrencyCodeFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("PGK"));
ArrayList<Filter> filters = new ArrayList<>();
filters.add(IDFilter);
filters.add(CurrencyCodeFilter);
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL ,filters);
scan.setMaxVersions(1);
scan.setFilter(filterList);
这对于char、varchar和numbers这样的主键属性非常有效。但我不能根据日期筛选出来,这是非常必要的。
日期的问题是:
我不理解它使用的编码,例如phoenix api将日期“2018-01-30”存储为\x80\x00\x01ad\x5c\xfc\x00
我知道phoenix api将“\x00”放在varchar之后作为分隔符,但我不理解这种编码。
所以我尝试在hbase shell中运行以下命令:
hbase(main):007:0>扫描“hstp2”,{filter=>“rowfilter(=,'substring:\x80\x00\x01ad\x5c\xfc\x00')”}
我得到了正确的结果
但当我使用hbase api在java中尝试同样的方法时,没有得到任何结果:
Filter DateFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("\\x80\\x00\\x01aD\\x5C\\xFC\\x00"));
当我清除日期过滤器时,我得到了这个
行筛选器(相等,\x5cx80\x5cx00\x5cx01ad\x5cx5c\x5cxfc\x5cx00)
“\”>“\x5c”的转换是问题的原因,因此我没有得到任何结果。
如何根据任何日期执行行筛选?我需要将日期转换为phoenix api存储它的格式,然后运行行过滤器吗?还是有别的办法?
这是我迄今为止测试基于不同属性过滤和解码提取数据的代码:
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.phoenix.schema.PStringColumn;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.shaded.org.apache.directory.shared.kerberos.codec.types.PaDataType;
import org.apache.phoenix.shaded.org.joni.Regex;
import org.apache.hadoop.hbase.util.Bytes;
public class HbaseVersionedPriceFetcher {
public static void main(String[] args) {
try {
Configuration conf = HBaseConfiguration.create(new Configuration());
conf.set("hbase.zookeeper.quorum", "hostName");//Private Detail
conf.set("hbase.zookeeper.property.clientPort", "2181");
HTable table = new HTable(conf, "HSTP2");
// Filter filter = new SingleColumnValueFilter("0".getBytes(),"LAST_CHG_USR_ID".getBytes(), CompareOp.EQUAL, "AUTO:GEN:SCRIPT".getBytes());
// Filter filter = new SingleColumnValueFilter("ISPH".getBytes(),"MKT_OID".getBytes(), CompareOp.EQUAL, "MARKET".getBytes());
// Filter filter = new SingleColumnValueFilter("ISPH".getBytes(),"VALIDATED_PRC_TYPE".getBytes(), CompareOp.EQUAL, "MID".getBytes());
Scan scan = new Scan();
//Filter List
Filter IDFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("qA$F62&81"));
Filter CurrencyCodeFilter = new RowFilter(CompareOp.EQUAL, new SubstringComparator("PGK"));
ArrayList<Filter> filters = new ArrayList<>();
filters.add(IDFilter);
filters.add(CurrencyCodeFilter);
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL ,filters);
scan.setMaxVersions(1);
scan.setFilter(filterList);
//REGEX
//Filter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(".*PGK.*VENDOR.*"))
//scan.addColumn("ISPH".getBytes(), "ADJST_TMS".getBytes());
// long start = new Long("1529578558767");
// long end = new Long("1529580854059");
//
// try {
// scan.setTimeRange(start,end);
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
ResultScanner scanner = table.getScanner(scan);
int count = 0;
for (Result rr : scanner) {
count += 1;
System.out.println("Instrument "+ count);
System.out.println(rr);
for (KeyValue value: rr.raw()) {
String qualifier = new String(value.getQualifier());
System.out.print( qualifier+" : ");
byte[] valByteArray = value.getValue();
if(qualifier.equals("ASK_CPRC") || qualifier.equals("BID_CPRC") || qualifier.equals("MID_CPRC") || qualifier.equals("VALIDATED_CPRC")) {
float decoded = PFloat.INSTANCE.getCodec().decodeFloat(valByteArray, 0, SortOrder.getDefault());
System.out.println(decoded);
} else if (qualifier.equals("LAST_CHG_TMS") || qualifier.equals("ADJST_TMS") ) {
System.out.println(PDate.INSTANCE.toObject(valByteArray, SortOrder.getDefault()));
} else if (qualifier.equals("HST_PRC_DTE_OF_NUM")) {
int decoded = PInteger.INSTANCE.getCodec().decodeInt(valByteArray, 0, SortOrder.getDefault());
System.out.println(decoded);
} else {
System.out.println(new String(valByteArray));
}
}
}
scanner.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
static byte[] getBytes(String string) {
return string.getBytes();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!