在我的工作中,我有这样的代码:
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
class MyJob(args: Args) extends Job(args) {
FileInputFormat.setInputPathFilter(???, classOf[MyFilter])
// ... rest of job ...
}
class MyFilter extends PathFilter {
def accept(path:Path): Boolean = true
}
我的问题是 FileInputFormat.setInputPathFilter
方法的类型必须为 org.apache.hadoop.mapreduce.Job
. 如何访问我的烫手作业中的hadoop作业对象?
1条答案
按热度按时间nwnhqdif1#
免责声明
没有办法提取
Job
班级。但你可以(但永远不应该这么做!)提取JobConf
. 之后你就可以使用FileInputFormat.setInputPathFilter
来自mapreduce.v1 api(org.apache.hadoop.mapred.JobConf
)这将允许存档过滤。但我建议你不要这样做。读答案的结尾,
你怎么能这么做?
覆盖
stepStrategy
方法scalding.Job
实施FlowStepStrategy
. 例如,此实现允许更改mapreduce作业的名称为什么不这样做?
访问jobconf以添加路径筛选仅在使用特定源时有效,如果使用其他源,则会中断。此外,您还将混合不同层次的抽象。我并不是从如何知道您实际需要修改的jobconf开始的(我看到的大多数烫伤性作业都是多步骤的)
如何解决这个问题?
我建议你仔细看看
Source
您正在使用。我敢肯定,有一个函数应用路径过滤期间或之前Pipe
(或TypedPipe
)建筑。