如何逐步加载协处理器

gzjq41n4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(213)

谁能解释一下如何通过shell加载RegionProcessor吗?我无法获得有关加载和部署协处理器的正确信息。提前谢谢

m1m5dgzv

m1m5dgzv1#

请按照以下步骤操作:
步骤1:创建接口并扩展 org.apache.hadoop.hbase.ipc.CoprocessorProtocol 第2步:在协处理器调用完成后要执行的接口中定义方法
步骤3:创建 HTable 第四步:呼叫 HTable.coprocessorExec() 具有所有必需参数的方法
请查看以下示例:
在这个例子中,我们试图得到注册号在我们感兴趣的范围内的学生的名单。
正在创建接口协议:

public interface CoprocessorTestProtocol extends org.apache.hadoop.hbase.ipc.CoprocessorProtocol{
    List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber) throws IOException;
}

学生班级示例:

public class Student implements Serializable{
    byte[] registrationNumber;
    String name;

    public void setRegistrationNumber(byte[] registrationNumber){
        this.registrationNumber = registrationNumber;
    }

    public byte[] getRegistrationNumber(){
        return this.registrationNumber;
    }

    public void setName(String name){
        this.name = name;
    }

    public int getName(){
        return this.name;
    }

    public String toString(){
        return "Student[ registration number = " + Bytes.toInt(this.getRegistrationNumber()) + " name = " + this.getName() + " ]"
    }
}

模型类:[其中写入了从hbase获取数据的业务逻辑]

public class MyModel extends org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor implements CoprocessorTestProtocol{

    @Override
    List<Student> getStudentList(byte[] startRegistrationNumber, byte[] endRegistrationNumber){
        Scan scan = new Scan();
        scan.setStartRow(startRegistrationNumber);
        scan.setStopRow(endRegistrationNumber);

        InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan);

        List<KeyValue> currentTempObj = new ArrayList<KeyValue>();
        List<Student> studentList = new ArrayList<Student>();

        try{
            Boolean hasNext = false;
            Student student;

            do{
                currentTempObj.clear();
                hasNext = scanner.next(currentTempObj);

                if(!currentTempObj.isEmpty()){
                    student = new Student();
                    for(KeyValue keyValue: currentTempObj){
                        bytes[] qualifier = keyValue.getQualifier();
                        if(Arrays.equals(qualifier, Bytes.toBytes("registrationNumber")))
                            student.setRegistrationNumber(keyValue.getValue());
                        else if(Arrays.equals(qualifier, Bytes.toBytes("name")))
                            student.setName(Bytes.toString(keyValue.getValue()));
                    }
                    StudentList.add(student);
                }
            }while(hasNext);

        }catch (Exception e){
            // catch the exception the way you want
        }
        finally{
            scanner.close();
        }
    }
}

客户端类:[调用协处理器的地方]

public class MyClient{

    if (args.length < 2) {
        System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
        return;
    }

    public List<Student> displayStudentInfo(int startRegistrationNumber, int endRegistrationNumber){
        final byte[] startKey=Bytes.toBytes(startRegistrationNumber);
        final byte[] endKey=Bytes.toBytes(endRegistrationNumber);

    String zkPeers = SystemInfo.getHBaseZkConnectString();
    Configuration configuration=HBaseConfiguration.create();
    configuration.set(HConstants.ZOOKEEPER_QUORUM, zkPeers);

    HTableInterface table = new HTable(configuration, TABLE_NAME);

        Map<byte[],List<Student>> allRegionOutput;

        allRegionOutput = table.coprocessorExec(CoprocessorTestProtocol.class, startKey,endKey,
                new Batch.Call<CoprocessorTestProtocol, List<Student>>() {
            public List<Student> call(CoprocessorTestProtocol instance)throws IOException{
                return instance.getStudentList(startKey, endKey);
            }
        });

        table.close();

        List<Student> anotherList = new ArrayList<Student>();

        for (List<Student> studentData: allRegionOutput.values()){
            anotherList.addAll(studentData);
        }

        return anotherList;
    }

    public static void main(String args){

        if (args.length < 2) {
            System.out.println("Usage : startRegistrationNumber endRegistrationNumber");
            return;
        }

        int startRegistrationNumber = args[0];
        int endRegistrationNumber = args[1];

        for (Student student : displayStudentInfo(startRegistrationNumber, endRegistrationNumber)){
            System.out.println(student);
        }
    }
}

请注意:请特别留意 Scanner.next(Object) 方法。这将返回布尔值并将当前对象存储在 Object 论点

相关问题