使用luigiphon运行hadoopjar

wvmv3b1j  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(429)

我需要使用python中的luigi运行hadoopjar作业。我搜索并找到了用luigi编写mapper和reducer的例子,但没有找到直接运行hadoopjar的例子。
我需要运行一个直接编译的hadoopjar。我该怎么做?

xfb7svmp

xfb7svmp1#

你需要使用 luigi.contrib.hadoop_jar 包(代码)。
特别是,你需要扩展 HadoopJarJobTask . 例如:

  1. from luigi.contrib.hadoop_jar import HadoopJarJobTask
  2. from luigi.contrib.hdfs.target import HdfsTarget
  3. class TextExtractorTask(HadoopJarJobTask):
  4. def output(self):
  5. return HdfsTarget('data/processed/')
  6. def jar(self):
  7. return 'jobfile.jar'
  8. def main(self):
  9. return 'com.ololo.HadoopJob'
  10. def args(self):
  11. return ['--param1', '1', '--param2', '2']

您还可以将使用maven构建jar文件包括到工作流中:

  1. import luigi
  2. from luigi.contrib.hadoop_jar import HadoopJarJobTask
  3. from luigi.contrib.hdfs.target import HdfsTarget
  4. from luigi.file import LocalTarget
  5. import subprocess
  6. import os
  7. class BuildJobTask(luigi.Task):
  8. def output(self):
  9. return LocalTarget('target/jobfile.jar')
  10. def run(self):
  11. subprocess.call(['mvn', 'clean', 'package', '-DskipTests'])
  12. class YourHadoopTask(HadoopJarJobTask):
  13. def output(self):
  14. return HdfsTarget('data/processed/')
  15. def jar(self):
  16. return self.input().fn
  17. def main(self):
  18. return 'com.ololo.HadoopJob'
  19. def args(self):
  20. return ['--param1', '1', '--param2', '2']
  21. def requires(self):
  22. return BuildJobTask()
展开查看全部

相关问题