使用kerberos在Map器中连接到acumulo

lx0bsm1f  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(257)

我正在将一些软件从旧的hadoop集群(使用用户名/密码身份验证)移到新的2.6.0-cdh5.12.0,它启用了kerberos身份验证。
我已经能够使用accumuloinput/outputformat类中的delegationtoken集,使许多使用accumulo进行输入和/或输出的map/reduce作业正常工作。
但是,我有一个作业,它使用acumuloinput/outputformat进行输入和输出,但是在它的mapper.setup()方法中,它通过zookeeper连接到acumulo,以便在mapper.map()方法中,它可以将正在处理的每个键/值与mapper.map()和另一个acumulo表中的条目进行比较。
我包含了下面的相关代码,其中显示了setup()方法,该方法连接zookeeper用户一个passwordtoken,然后创建一个acumulo表扫描器,然后在mapper方法中使用。
所以问题是如何用kerberostoken替换passwordtoken,以便在mapper.setup()方法中设置accumulo扫描仪?我找不到方法“获取”我设置的accumuloinput/outputformat类使用的delegationtoken。
我尝试了context.getcredentials().getalltokens()并查找类型为org.apache.accumulo.code.client.security.tokens.authenticationtoken的令牌--这里返回的所有令牌的类型都是org.apache.hadoop.security.token.token。
请注意,当代码在未连接到internet的网络上运行时,我键入了代码片段,而不是剪切/粘贴-也就是说,可能有一个输入错误。:)

//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)

//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;

public void setup(Context context) {
    Configuration cfg = context.getConfiguration();

    // properties set and passed from M/R Driver program
    String username = cfg.get("UserName");
    String password = cfg.get("Password");
    String accumuloInstName = cfg.get("InstanceName");
    String zookeepers = cfg.get("Zookeepers");
    String tableName = cfg.get("TableName");
    Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
    try {
      AuthenticationToken passwordToken = new PasswordToken(password);

      Connector conn = inst.getConnector(username, passwordToken);

      index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
    } catch(Exception e) {
       e.printStackTrace();
    }
}

public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
    String uuid = key.getRow().toString();
    index_scanner.clearColumns();
    index_scanner.setRange(Range.exact(uuid));
    for(Entry<Key, Value> entry : index_scanner) {
        // do some processing in here
    }
}
5ssjco0h

5ssjco0h1#

提供的AccumuOutputFormat和AccumuOutputFormat有一个方法,可以使用 Accumulo*putFormat.setConnectorInfo(job, principle, token) . 您还可以使用authenticationtokenserializer并使用 setConnectorInfo 接受文件名的方法。
如果传入一个kerberostoken,作业将创建一个delegationtoken来使用,如果传入一个delegationtoken,它将只使用它。
提供的 AccumuloInputFormat 应该处理自己的扫描器,所以通常情况下,如果配置设置正确,就不必在Map器中这样做。但是,如果您在mapper中进行二次扫描(例如连接),则可以检查提供的 AccumuloInputFormat 的recordreader源代码,以获取如何检索配置和构造扫描仪的示例。

相关问题