我需要使用python中的luigi运行hadoopjar作业。我搜索并找到了用luigi编写mapper和reducer的例子,但没有找到直接运行hadoopjar的例子。我需要运行一个直接编译的hadoopjar。我该怎么做?
xfb7svmp1#
你需要使用 luigi.contrib.hadoop_jar 包(代码)。特别是,你需要扩展 HadoopJarJobTask . 例如:
luigi.contrib.hadoop_jar
HadoopJarJobTask
from luigi.contrib.hadoop_jar import HadoopJarJobTaskfrom luigi.contrib.hdfs.target import HdfsTargetclass TextExtractorTask(HadoopJarJobTask): def output(self): return HdfsTarget('data/processed/') def jar(self): return 'jobfile.jar' def main(self): return 'com.ololo.HadoopJob' def args(self): return ['--param1', '1', '--param2', '2']
from luigi.contrib.hadoop_jar import HadoopJarJobTask
from luigi.contrib.hdfs.target import HdfsTarget
class TextExtractorTask(HadoopJarJobTask):
def output(self):
return HdfsTarget('data/processed/')
def jar(self):
return 'jobfile.jar'
def main(self):
return 'com.ololo.HadoopJob'
def args(self):
return ['--param1', '1', '--param2', '2']
您还可以将使用maven构建jar文件包括到工作流中:
import luigifrom luigi.contrib.hadoop_jar import HadoopJarJobTaskfrom luigi.contrib.hdfs.target import HdfsTargetfrom luigi.file import LocalTargetimport subprocessimport osclass BuildJobTask(luigi.Task): def output(self): return LocalTarget('target/jobfile.jar') def run(self): subprocess.call(['mvn', 'clean', 'package', '-DskipTests'])class YourHadoopTask(HadoopJarJobTask): def output(self): return HdfsTarget('data/processed/') def jar(self): return self.input().fn def main(self): return 'com.ololo.HadoopJob' def args(self): return ['--param1', '1', '--param2', '2'] def requires(self): return BuildJobTask()
import luigi
from luigi.file import LocalTarget
import subprocess
import os
class BuildJobTask(luigi.Task):
return LocalTarget('target/jobfile.jar')
def run(self):
subprocess.call(['mvn', 'clean', 'package', '-DskipTests'])
class YourHadoopTask(HadoopJarJobTask):
return self.input().fn
def requires(self):
return BuildJobTask()
1条答案
按热度按时间xfb7svmp1#
你需要使用
luigi.contrib.hadoop_jar
包(代码)。特别是,你需要扩展
HadoopJarJobTask
. 例如:您还可以将使用maven构建jar文件包括到工作流中: