谁能解释一下如何通过shell加载RegionProcessor吗?我无法获得有关加载和部署协处理器的正确信息。提前谢谢
m1m5dgzv1#
请按照以下步骤操作:步骤1:创建接口并扩展 org.apache.hadoop.hbase.ipc.CoprocessorProtocol 第2步:在协处理器调用完成后要执行的接口中定义方法步骤3:创建 HTable 第四步:呼叫 HTable.coprocessorExec() 具有所有必需参数的方法请查看以下示例:在这个例子中,我们试图得到注册号在我们感兴趣的范围内的学生的名单。正在创建接口协议:
org.apache.hadoop.hbase.ipc.CoprocessorProtocol
HTable
HTable.coprocessorExec()
public interface CoprocessorTestProtocol extends org.apache.hadoop.hbase.ipc.CoprocessorProtocol{ List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber) throws IOException; }
学生班级示例:
public class Student implements Serializable{ byte[] registrationNumber; String name; public void setRegistrationNumber(byte[] registrationNumber){ this.registrationNumber = registrationNumber; } public byte[] getRegistrationNumber(){ return this.registrationNumber; } public void setName(String name){ this.name = name; } public int getName(){ return this.name; } public String toString(){ return "Student[ registration number = " + Bytes.toInt(this.getRegistrationNumber()) + " name = " + this.getName() + " ]" } }
模型类:[其中写入了从hbase获取数据的业务逻辑]
public class MyModel extends org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor implements CoprocessorTestProtocol{ @Override List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber){ Scan scan = new Scan(); scan.setStartRow(startRegistrationNumber); scan.setStopRow(endRegistrationNumber); InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan); List<KeyValue> currentTempObj = new ArrayList<KeyValue>(); List<Student> studentList = new ArrayList<Student>(); try{ Boolean hasNext = false; Student student; do{ currentTempObj.clear(); hasNext = scanner.next(currentTempObj); if(!currentTempObj.isEmpty()){ student = new Student(); for(KeyValue keyValue: currentTempObj){ bytes[] qualifier = keyValue.getQualifier(); if(Arrays.equals(qualifier, Bytes.toBytes("registrationNumber"))) student.setRegistrationNumber(keyValue.getValue()); else if(Arrays.equals(qualifier, Bytes.toBytes("name"))) student.setName(Bytes.toString(keyValue.getValue())); } StudentList.add(student); } }while(hasNext); }catch (Exception e){ // catch the exception the way you want } finally{ scanner.close(); } } }
客户端类:[调用协处理器的地方]
public class MyClient{ if (args.length < 2) { System.out.println("Usage : startRegistrationNumber endRegistrationNumber"); return; } public List<Student> displayStudentInfo(int startRegistrationNumber, int endRegistrationNumber){ final byte[] startKey=Bytes.toBytes(startRegistrationNumber); final byte[] endKey=Bytes.toBytes(endRegistrationNumber); String zkPeers = SystemInfo.getHBaseZkConnectString(); Configuration configuration=HBaseConfiguration.create(); configuration.set(HConstants.ZOOKEEPER_QUORUM, zkPeers); HTableInterface table = new HTable(configuration, TABLE_NAME); Map<byte[],List<Student>> allRegionOutput; allRegionOutput = table.coprocessorExec(CoprocessorTestProtocol.class, startKey,endKey, new Batch.Call<CoprocessorTestProtocol, List<Student>>() { public List<Student> call(CoprocessorTestProtocol instance)throws IOException{ return instance.getStudentList(startKey, endKey); } }); table.close(); List<Student> anotherList = new ArrayList<Student>(); for (List<Student> studentData: allRegionOutput.values()){ anotherList.addAll(studentData); } return anotherList; } public static void main(String args){ if (args.length < 2) { System.out.println("Usage : startRegistrationNumber endRegistrationNumber"); return; } int startRegistrationNumber = args[0]; int endRegistrationNumber = args[1]; for (Student student : displayStudentInfo(startRegistrationNumber, endRegistrationNumber)){ System.out.println(student); } } }
请注意:请特别留意 Scanner.next(Object) 方法。这将返回布尔值并将当前对象存储在 Object 论点
Scanner.next(Object)
Object
1条答案
按热度按时间m1m5dgzv1#
请按照以下步骤操作:
步骤1:创建接口并扩展
org.apache.hadoop.hbase.ipc.CoprocessorProtocol
第2步:在协处理器调用完成后要执行的接口中定义方法步骤3:创建
HTable
第四步:呼叫HTable.coprocessorExec()
具有所有必需参数的方法请查看以下示例:
在这个例子中,我们试图得到注册号在我们感兴趣的范围内的学生的名单。
正在创建接口协议:
学生班级示例:
模型类:[其中写入了从hbase获取数据的业务逻辑]
客户端类:[调用协处理器的地方]
请注意:请特别留意
Scanner.next(Object)
方法。这将返回布尔值并将当前对象存储在Object
论点