使用luigiphon运行hadoopjar

wvmv3b1j  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(384)

我需要使用python中的luigi运行hadoopjar作业。我搜索并找到了用luigi编写mapper和reducer的例子,但没有找到直接运行hadoopjar的例子。
我需要运行一个直接编译的hadoopjar。我该怎么做?

xfb7svmp

xfb7svmp1#

你需要使用 luigi.contrib.hadoop_jar 包(代码)。
特别是,你需要扩展 HadoopJarJobTask . 例如:

from luigi.contrib.hadoop_jar import HadoopJarJobTask
from luigi.contrib.hdfs.target import HdfsTarget

class TextExtractorTask(HadoopJarJobTask):
    def output(self):
        return HdfsTarget('data/processed/')

    def jar(self):
        return 'jobfile.jar'

    def main(self):
        return 'com.ololo.HadoopJob'

    def args(self):
        return ['--param1', '1', '--param2', '2']

您还可以将使用maven构建jar文件包括到工作流中:

import luigi
from luigi.contrib.hadoop_jar import HadoopJarJobTask
from luigi.contrib.hdfs.target import HdfsTarget
from luigi.file import LocalTarget

import subprocess
import os

class BuildJobTask(luigi.Task):
    def output(self):
        return LocalTarget('target/jobfile.jar')

    def run(self):
        subprocess.call(['mvn', 'clean', 'package', '-DskipTests'])

class YourHadoopTask(HadoopJarJobTask):
    def output(self):
        return HdfsTarget('data/processed/')

    def jar(self):
        return self.input().fn

    def main(self):
        return 'com.ololo.HadoopJob'

    def args(self):
        return ['--param1', '1', '--param2', '2']

    def requires(self):
        return BuildJobTask()

相关问题