如何在oozie的决策节点中动态获取文件名?

rjee0c15  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(417)

我想检查文件是否存在,在hdfs位置使用oozie批处理。
在我的hdfs位置,每天晚上11点我都会收到类似“test\u 08\u 01\u 2016.csv”、“test\u 08\u 02\u 2016.csv”的文件。
所以我想检查文件是否存在是在晚上11点15分之后,我可以检查文件是否存在不使用决策节点。使用下面的工作流。

<workflow-app name="HIVECoWorkflow" xmlns="uri:oozie:workflow:0.5">
<start to="CheckFile"/>
<decision name="CheckFile">
     <switch>
        <case to="nextOozieTask">
          ${fs:exists("/user/cloudera/file/input/test_08_01_2016.csv")}
        </case>
        <default to="MailActionFileMissing" />
     </switch>
<action name="MailActionFileMissing" cred="hive2">
    <hive2 xmlns="uri:oozie:hive2-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <jdbc-url>jdbc:hive2://quickstart.cloudera:10000/default</jdbc-url>
        <script>/user/cloudera/email/select.hql</script>
        <file>/user/cloudera/hive-site.xml</file>
    </hive2>
    <ok to="End"/>
    <error to="Kill"/>
</action>
<action name="nextOozieTask" cred="hive2">
    <hive2 xmlns="uri:oozie:hive2-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <jdbc-url>jdbc:hive2://quickstart.cloudera:10000/default</jdbc-url>
        <script>/user/cloudera/email/select1.hql</script>
        <file>/user/cloudera/hive-site.xml</file>
    </hive2>
    <ok to="End"/>
    <error to="Kill"/>
</action>

 <kill name="Kill">
    <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="End"/>

但我想动态获取文件名,例如“ filenamt_todaysdate i.e test_08_01_2016.csv ".
请帮我解决这个问题,我怎样才能找到你。
提前谢谢。

zujrkrfu

zujrkrfu1#

上述问题的解决方案是,我们必须从协调作业中获取日期值,如下面的代码所示,在协调作业中。

<property>
    <name>today</name>
    <value>${coord:formatTime(coord:dateTzOffset(coord:nominalTime(), "America/Los_Angeles"), 'yyyyMMdd')}</value>
  </property>

在帮助下,我们可以检查文件是否存在于给定的hdfs位置 fs:exists

${fs:exists(concat(concat(nameNode, path),today))}

在工作流中,我们必须传递协调作业日期值“today”的参数,如下代码所示

<workflow-app name="HIVECoWorkflow" xmlns="uri:oozie:workflow:0.5">
<start to="CheckFile"/>
<decision name="CheckFile">
     <switch>
        <case to="nextOozieTask">
          ${fs:exists(concat(concat(nameNode, path),today))}
        </case>
         <case to="nextOozieTask1">
          ${fs:exists(concat(concat(nameNode, path),yesterday))}
        </case>
        <default to="MailActionFileMissing" />
     </switch>  </decision>

<action name="MailActionFileMissing" cred="hive2">
    <hive2 xmlns="uri:oozie:hive2-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <jdbc-url>jdbc:hive2://quickstart.cloudera:10000/default</jdbc-url>
        <script>/user/cloudera/email/select.hql</script>
        <file>/user/cloudera/hive-site.xml</file>
    </hive2>
    <ok to="End"/>
    <error to="Kill"/>
</action>
<action name="nextOozieTask" cred="hive2">
    <hive2 xmlns="uri:oozie:hive2-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <jdbc-url>jdbc:hive2://quickstart.cloudera:10000/default</jdbc-url>
        <script>/user/cloudera/email/select1.hql</script>
        <file>/user/cloudera/hive-site.xml</file>
    </hive2>
    <ok to="End"/>
    <error to="Kill"/>
</action><kill name="Kill">
    <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="End"/>

在job.properties中,我们可以像下面那样声明所有静态值。

jobStart=2016-08-23T09:50Z
jobEnd=2016-08-23T10:26Z
tzOffset=-8
initialDataset=2016-08-23T09:50Z
oozie.use.system.libpath=True
security_enabled=False
dryrun=True
jobTracker=localhost:8032
nameNode=hdfs://quickstart.cloudera:8020
test=${nameNode}/user/cloudera/email1                
oozie.coord.application.path=${nameNode}/user/cloudera/email1/add-partition-coord-app.xml
path=/user/cloudera/file/input/ravi_
mkshixfv

mkshixfv2#

可能你可以写一个shell脚本来检查hdfs文件是否存在。成功后返回0或1。基于此重写oozie工作流的成功和错误节点。。。

相关问题