是否可以从一个烫手的作业访问底层的org.apache.hadoop.mapreduce.job?

ki0zmccv  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(292)

在我的工作中,我有这样的代码:

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

class MyJob(args: Args) extends Job(args) {
  FileInputFormat.setInputPathFilter(???, classOf[MyFilter])
  // ... rest of job ...
}

class MyFilter extends PathFilter {
  def accept(path:Path): Boolean = true
}

我的问题是 FileInputFormat.setInputPathFilter 方法的类型必须为 org.apache.hadoop.mapreduce.Job . 如何访问我的烫手作业中的hadoop作业对象?

nwnhqdif

nwnhqdif1#

免责声明

没有办法提取 Job 班级。但你可以(但永远不应该这么做!)提取 JobConf . 之后你就可以使用 FileInputFormat.setInputPathFilter 来自mapreduce.v1 api( org.apache.hadoop.mapred.JobConf )这将允许存档过滤。
但我建议你不要这样做。读答案的结尾,

你怎么能这么做?

覆盖 stepStrategy 方法 scalding.Job 实施 FlowStepStrategy . 例如,此实现允许更改mapreduce作业的名称

override def stepStrategy: Option[FlowStepStrategy[_]] = Some(new FlowStepStrategy[AnyRef]{
  override def apply(flow: Flow[AnyRef], predecessorSteps: util.List[FlowStep[AnyRef]], step: FlowStep[AnyRef]): Unit =
    step.getConfig match {
      case conf: JobConf =>
        # here you can modify the JobConf of each job.
        conf.setJobName(...)
      case _ =>
    }
})

为什么不这样做?

访问jobconf以添加路径筛选仅在使用特定源时有效,如果使用其他源,则会中断。此外,您还将混合不同层次的抽象。我并不是从如何知道您实际需要修改的jobconf开始的(我看到的大多数烫伤性作业都是多步骤的)

如何解决这个问题?

我建议你仔细看看 Source 您正在使用。我敢肯定,有一个函数应用路径过滤期间或之前 Pipe (或 TypedPipe )建筑。

相关问题