来自java代码的工作流已挂起

mwg9r5ms  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(353)

我正在运行一个有java代码的工作流,它反过来又启动了另一个oozie工作流。主工作流工作正常,但从java代码启动的工作流始终处于挂起状态。我无法恢复它,因为该用户已Map,而不是我。知道有什么问题吗?
这是我的主要工作流程

<java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <main-class>com.last.play.LaunchJob</main-class>
        <arg>currentUser=${currentUser}</arg>
     </java>

下面是java代码:

Map<String, String> commandArgs = getActionArgs(args);

    Path appPropertyPath = new Path("/user/cmahajan/app.properties");
    Path jobPropertyPath = new Path("/user/cmahajan/job.properties");
    OozieClient wc = new OozieClient("http://host07.com:11000/oozie");

    String userName = commandArgs.get("currentUser");
    System.out.println("User Name recieved ::" + userName);
    Configuration trial = new Configuration();
    FileSystem fs = FileSystem.get(trial);

    Properties conf = wc.createConfiguration();
    Properties jobProperties = new Properties();
    Properties appProperties = new Properties();
    appProperties.load(fs.open(appPropertyPath));
    String version = appProperties.getProperty("version");
    jobProperties.load(fs.open(jobPropertyPath));

    for (Object key : jobProperties.keySet()) {
        String propValue = jobProperties.getProperty((String) key);
        propValue = propValue.replaceAll("\\$\\{user.name\\}", userName);
        conf.setProperty((String) key, propValue);
        System.out.println("Key ::" + key);
        System.out.println("Value ::" + propValue);
        System.out.println(" ===================");
    }
    String appsRoot = "${wfsBasePath}/" + version + "/apps";
    conf.setProperty("appsRoot", appsRoot);
    try {
        String jobId = wc.run(conf);

        System.out.println("Workflow job submitted");

        while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            System.out.println("Workflow job running ...");
            Thread.sleep(10 * 1000);
        }
        System.out.println("Workflow job completed ...");
        System.out.println(wc.getJobInfo(jobId));
    } catch (OozieClientException oozieClientException) {
        oozieClientException.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
ef1yzkbh

ef1yzkbh1#

可以使用usergroupinformation设置用户。

UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username);
ugi.doAs(new PrivilegedExceptionAction<MyMapReduceWrapperClass>() {
public Object run() throws Exception {
    MyMapReduceWrapperClass mr = new MyMapReduceWrapperClass();
    ToolRunner.run(mr, null);
    return mr;
}
});

相关问题