开始在amazonemr上使用cascading,我们成功地运行了它,但是遇到了一个相当简单的障碍,我希望有人能对它有所帮助。
我的代码:
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.tap.SinkMode;
public class Main
{
public static void
main( String[] args )
{
String inPath = args[ 0 ];
String outPath = args[ 1 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create the source tap
TextLine sourceScheme = new TextLine(new Fields("line"));
Tap inTap = new Hfs( sourceScheme, inPath );
// create the sink tap
TextLine sinkScheme = new TextLine( new Fields("custid", "movieids"));
Tap outTap = new Hfs( sinkScheme, outPath, SinkMode.REPLACE );
Fields filmFields = new Fields("custid", "movieids");
String filmRegex = "([0-9]:*[,.]*)";
RegexParser parser = new RegexParser(filmFields, filmRegex);
Pipe importPipe = new Each("import", new Fields("line"), parser, Fields.RESULTS );
// connect the taps, pipes, etc., into a flow
Flow parsedFlow = new HadoopFlowConnector(properties).connect(inTap, outTap, importPipe);
// run the flow
parsedFlow.start();
parsedFlow.complete();
}
}
我的输入(没有空行):
1:2
2:4
5:1
3:9
我的输出:
Task TASKID="task_201305241444_0003_m_000000" TASK_TYPE="MAP" TASK_STATUS="FAILED" FINISH_TIME="1369408133954" ERROR="cascading\.tuple\.TupleException: operation added the wrong number of fields, expected: ['custid', 'movieids'], got result size: 1
at cascading\.tuple\.TupleEntryCollector\.add(TupleEntryCollector\.java:82)
at cascading\.operation\.regex\.RegexParser\.onFoundGroups(RegexParser\.java:168)
at cascading\.operation\.regex\.RegexParser\.operate(RegexParser\.java:151)
at cascading\.flow\.stream\.FunctionEachStage\.receive(FunctionEachStage\.java:99)
at cascading\.flow\.stream\.FunctionEachStage\.receive(FunctionEachStage\.java:39)
at cascading\.flow\.stream\.SourceStage\.map(SourceStage\.java:102)
at cascading\.flow\.stream\.SourceStage\.run(SourceStage\.java:58)
at cascading\.flow\.hadoop\.FlowMapper\.run(FlowMapper\.java:127)
at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:441)
at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:377)
at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:255)
at java\.security\.AccessController\.doPrivileged(Native Method)
at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)
at org\.apache\.hadoop\.security\.UserGroupInformation\.doAs(UserGroupInformation\.java:1132)
at org\.apache\.hadoop\.mapred\.Child\.main(Child\.java:249)
注册前在http://regexpal.com/
谢谢
邓肯
1条答案
按热度按时间ttvkxqim1#
因为正则表达式只生成一个结果,其中两个结果字段(即“custid”和“movieid”)除外,所以会出现异常,因为正则表达式只包含一个组(…)。
如果只想在冒号处拆分,请使用包含两个组的表达式,例如:
或者
\d+
,如果您的数字可以有多个数字。或者,更简单的方法是,在使用
TextDelimited
输入方案: