我正在使用cascading2创建hadoop作业,并尝试创建一个从单个源开始的流。在对数据应用了几个函数之后,我需要分割流,以便使用这些数据创建两个单独的报告(在两个单独的接收器中)。
//SOURCE
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, input );
//REPORT1 SINK
Scheme report1SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report1Sink = new Hfs( report1SinkScheme, output1, SinkMode.REPLACE );
//REPORT2 SINK
Scheme report2SinkScheme = new TextDelimited( Fields.ALL, ",","\"" );
Tap report2Sink = new Hfs( report2SinkScheme, output2, SinkMode.REPLACE );
//INITIAL FUNCTIONS
Pipe firstPipe = new Pipe("firstPipe");
firstPipe = new Each(firstPipe, new Fields("line"), functionA);
firstPipe = new Each(firstPipe, functionB, Fields.ALL);
//REPORT1 FUNCTION
report1Pipe = new Each(firstPipe, Fields.ALL, function1, Fields.RESULTS);
//REPORT2 FUNCTION
report2Pipe = new Each(firstPipe, Fields.ALL, function2, Fields.RESULTS);
//CONNECT FLOW PARTS
FlowDef flowDef = new FlowDef()
.setName("report-flow")
.addSource(firstPipe, source)
.addSink(report1Pipe, report1Sink)
.addSink(report2Pipe, report2Sink);
new HadoopFlowConnector( properties ).connect( flowDef ).complete();
目前这给了我一个错误“java.lang.illegalargumentexception:cannot add duplicate sink:firstpipe”,但即使在处理了一段时间之后,我仍然会遇到与流设置有关的各种其他问题。
有人能解释一下如何构造这种形式的流(一个源,两个汇)吗?我需要创建级联吗?或者在分割之前需要中间接收器来保存数据?
请帮帮我!
2条答案
按热度按时间hgc7kmma1#
您可以使用级联文档中提到的拆分模式。举个例子:
mzmfm0qo2#
拆分模式在级联用户指南中:http://docs.cascading.org/cascading/2.1/userguide/htmlsingle/#n21362
另一个(更简单的)例子包含在“不耐烦者的级联”第5部分和第6部分:
https://github.com/cascading/impatient/wiki/part-5
https://github.com/cascading/impatient/wiki/part-6
关于上面显示的代码,有一点是它似乎缺少
report1Pipe
以及report2Pipe
. 要使用拆分模式,每个分支都需要一个名称,并且名称必须不同。引发异常的原因是有两个分支都继承了管道程序集中早期版本的相同名称。例如,这些
flowDef.addSink(..)
调用对flow planner不明确。所以在“不耐烦”的第5部分中,看看“d”、“df”和“tf”分支是如何在操作中命名的。
级联要求这种命名似乎有点违反直觉,但在大型复杂工作流中,当您将故障陷阱、调试等附加到特定分支时,这种命名就变得非常重要。
或者,clojure中的cascalog dsl更具声明性,因此它直接由语言处理——分支是子查询,陷阱等在子查询的闭包中处理。