cascading.tap.Tap.taps()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(20.7k)|赞(0)|评价(0)|浏览(101)

本文整理了Java中cascading.tap.Tap.taps()方法的一些代码示例,展示了Tap.taps()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.taps()方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:taps

Tap.taps介绍

[英]Convenience function to make an array of Tap instances.
[中]方便功能,用于创建一个Tap实例数组。

代码示例

代码示例来源:origin: cwensel/cascading

/**
 * Method tapsMap creates a new Map for the given name and tap.
 *
 * @param name of type String
 * @param tap  of type Tap
 * @return Map
 */
public static Map<String, Tap> tapsMap( String name, Tap tap )
 {
 return tapsMap( new String[]{name}, Tap.taps( tap ) );
 }

代码示例来源:origin: cwensel/cascading

/**
 * Method tapsMap creates a new Map using the given Pipe name and tap.
 *
 * @param pipe of type Pipe
 * @param tap  of type Tap
 * @return Map
 */
public static Map<String, Tap> tapsMap( Pipe pipe, Tap tap )
 {
 return tapsMap( Pipe.pipes( pipe ), Tap.taps( tap ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testMultiTapCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "multitap";
 Flow first = previousMultiTapFlow( path, "first" );
 Flow second = previousMultiTapFlow( path, "second" );
 Flow multitap = multiTapFlow( Tap.taps( first.getSink(), second.getSink() ), path );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( multitap, first, second );
 cascade.start();
 cascade.complete();
 validateLength( multitap, 40 );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testMultiTapCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "multitap";
 Flow first = previousMultiTapFlow( path, "first" );
 Flow second = previousMultiTapFlow( path, "second" );
 Flow multitap = multiTapFlow( Tap.taps( first.getSink(), second.getSink() ), path );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( multitap, first, second );
 cascade.start();
 cascade.complete();
 validateLength( multitap, 40 );
 }

代码示例来源:origin: cwensel/cascading

private Flow thirdFlow( Tap lhs, Tap rhs )
 {
 Pipe lhsPipe = new Pipe( "lhs" );
 Pipe rhsPipe = new Pipe( "rhs" );
 Pipe pipe = new CoGroup( lhsPipe, new Fields( 0 ), rhsPipe, new Fields( 0 ), Fields.size( 2 ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "third" ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) ), sink, pipe );
 }

代码示例来源:origin: cascading/cascading-platform

private Flow thirdFlow( Tap lhs, Tap rhs )
 {
 Pipe lhsPipe = new Pipe( "lhs" );
 Pipe rhsPipe = new Pipe( "rhs" );
 Pipe pipe = new CoGroup( lhsPipe, new Fields( 0 ), rhsPipe, new Fields( 0 ), Fields.size( 2 ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "third" ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) ), sink, pipe );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testUniqueMerge() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 getPlatform().copyFromLocal( inputFileRhs );
 Tap sourceLhs = getPlatform().getTextFile( inputFileLhs );
 Tap sourceRhs = getPlatform().getTextFile( inputFileRhs );
 Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "num", "char" ), getOutputPath( "uniquemerge-nondeterministic" ), SinkMode.REPLACE );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 Pipe lhsPipe = new Pipe( "lhs" );
 lhsPipe = new Each( lhsPipe, new Fields( "line" ), splitter );
 Pipe rhsPipe = new Pipe( "rhs" );
 rhsPipe = new Each( rhsPipe, new Fields( "line" ), splitter );
 Pipe pipe = new Unique( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "num" ) );
 Map<String, Tap> sources = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( sourceLhs, sourceRhs ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testUniqueMerge() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 getPlatform().copyFromLocal( inputFileRhs );
 Tap sourceLhs = getPlatform().getTextFile( inputFileLhs );
 Tap sourceRhs = getPlatform().getTextFile( inputFileRhs );
 Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "num", "char" ), getOutputPath( "uniquemerge-nondeterministic" ), SinkMode.REPLACE );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 Pipe lhsPipe = new Pipe( "lhs" );
 lhsPipe = new Each( lhsPipe, new Fields( "line" ), splitter );
 Pipe rhsPipe = new Pipe( "rhs" );
 rhsPipe = new Each( rhsPipe, new Fields( "line" ), splitter );
 Pipe pipe = new Unique( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "num" ) );
 Map<String, Tap> sources = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( sourceLhs, sourceRhs ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
 flow.complete();
 validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testDupeTailNames() throws Exception
 {
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "unknown" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( " " ) );
 Pipe group = new GroupBy( pipe, Fields.size( 3 ) );
 Pipe lhs = new Pipe( "tail", group );
 lhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
 Pipe rhs = new Pipe( "tail", group );
 rhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
 Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( lhs, rhs ), Tap.taps( sink, sink ) );
 try
  {
  getPlatform().getFlowConnector().connect( source, sinks, Pipe.pipes( lhs, rhs ) );
  fail( "did not fail on dupe head names" );
  }
 catch( Exception exception )
  {
  // ignore
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testCountMerge() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 getPlatform().copyFromLocal( inputFileRhs );
 Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "count" ), "\t",
  new Class[]{String.class, Integer.TYPE}, getOutputPath( "mergecount" ), SinkMode.REPLACE );
 Pipe lhsPipe = new Pipe( "count-lhs" );
 Pipe rhsPipe = new Pipe( "count-rhs" );
 rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
 Pipe countPipe = new CountBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "count" ), 2 );
 Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
 Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, countPipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", 4 ),
  new Tuple( "b", 8 ),
  new Tuple( "c", 8 ),
  new Tuple( "d", 4 ),
  new Tuple( "e", 2 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testCountMerge() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 getPlatform().copyFromLocal( inputFileRhs );
 Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "count" ), "\t",
  new Class[]{String.class, Integer.TYPE}, getOutputPath( "mergecount" ), SinkMode.REPLACE );
 Pipe lhsPipe = new Pipe( "count-lhs" );
 Pipe rhsPipe = new Pipe( "count-rhs" );
 rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
 Pipe countPipe = new CountBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "count" ), 2 );
 Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
 Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, countPipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", 4 ),
  new Tuple( "b", 8 ),
  new Tuple( "c", 8 ),
  new Tuple( "d", 4 ),
  new Tuple( "e", 2 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testDupeTailNames() throws Exception
 {
 Tap source = getPlatform().getTextFile( inputFileJoined );
 Tap sink = getPlatform().getTextFile( getOutputPath( "unknown" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( " " ) );
 Pipe group = new GroupBy( pipe, Fields.size( 3 ) );
 Pipe lhs = new Pipe( "tail", group );
 lhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
 Pipe rhs = new Pipe( "tail", group );
 rhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
 Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( lhs, rhs ), Tap.taps( sink, sink ) );
 try
  {
  getPlatform().getFlowConnector().connect( source, sinks, Pipe.pipes( lhs, rhs ) );
  fail( "did not fail on dupe head names" );
  }
 catch( Exception exception )
  {
  // ignore
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSumMerge() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 getPlatform().copyFromLocal( inputFileRhs );
 Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "sum" ), "\t",
  new Class[]{String.class, Integer.TYPE}, getOutputPath( "mergesum" ), SinkMode.REPLACE );
 Pipe lhsPipe = new Pipe( "sum-lhs" );
 Pipe rhsPipe = new Pipe( "sum-rhs" );
 rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
 Pipe sumPipe = new SumBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "num" ), new Fields( "sum" ), long.class, 2 );
 Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
 Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, sumPipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", 12 ),
  new Tuple( "b", 24 ),
  new Tuple( "c", 20 ),
  new Tuple( "d", 12 ),
  new Tuple( "e", 10 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testAverageMerge() throws IOException
 {
 getPlatform().copyFromLocal( inputFileLhs );
 getPlatform().copyFromLocal( inputFileRhs );
 Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
 Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "average" ), "\t",
  new Class[]{String.class, Double.TYPE}, getOutputPath( "mergeaverage" ), SinkMode.REPLACE );
 Pipe lhsPipe = new Pipe( "average-lhs" );
 Pipe rhsPipe = new Pipe( "average-rhs" );
 rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
 Pipe sumPipe = new AverageBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "num" ), new Fields( "average" ), 2 );
 Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
 Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, sumPipe );
 flow.complete();
 validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s[\\d.]+$" ) );
 Tuple[] results = new Tuple[]{
  new Tuple( "a", (double) 12 / 4 ),
  new Tuple( "b", (double) 24 / 8 ),
  new Tuple( "c", (double) 20 / 8 ),
  new Tuple( "d", (double) 12 / 4 ),
  new Tuple( "e", (double) 10 / 2 ),
  };
 TupleEntryIterator iterator = flow.openSink();
 int count = 0;
 while( iterator.hasNext() )
  assertEquals( results[ count++ ], iterator.next().getTuple() );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

/**
 * If the sinks have the same scheme as a temp tap, replace the temp tap
 *
 * @throws Exception
 */
@Test
public void testChainedTaps() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Pipe pipe = new Each( new Pipe( "first" ), new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Each( new Pipe( "second", pipe ), new Fields( "ip" ), new RegexFilter( "7" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Each( new Pipe( "third", pipe ), new Fields( "ip" ), new RegexFilter( "6" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 Tap sinkFirst = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/first" ), SinkMode.REPLACE );
 Tap sinkSecond = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/second" ), SinkMode.REPLACE );
 Tap sinkThird = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/third" ), SinkMode.REPLACE );
 Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"first", "second",
                             "third"}, Tap.taps( sinkFirst, sinkSecond, sinkThird ) );
 FlowConnector flowConnector = getPlatform().getFlowConnector();
 Flow flow = flowConnector.connect( source, sinks, pipe );
 if( getPlatform().isMapReduce() )
  assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
 flow.complete();
 validateLength( flow, 3 );
 }
}

代码示例来源:origin: cwensel/cascading

Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) );
Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) );

代码示例来源:origin: cwensel/cascading

@Test
public void testSplitComplex() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 // 46 192
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "split" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
 pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
 Map sources = Cascades.tapsMap( "split", source );
 Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
 flow.complete();
 validateLength( flow, 1, "left" );
 validateLength( flow, 1, "right" );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSplitComplex() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 // 46 192
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "split" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
 pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
 Map sources = Cascades.tapsMap( "split", source );
 Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
 flow.complete();
 validateLength( flow, 1, "left" );
 validateLength( flow, 1, "right" );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSplitMultiple() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 // 46 192
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
 Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE );
 Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE );
 Pipe head = new Pipe( "split" );
 head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 head = new GroupBy( head, new Fields( "ip" ) );
 head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
 head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
 Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
 Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
 right = new GroupBy( right, new Fields( "ip" ) );
 Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() );
 Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() );
 Map sources = Cascades.tapsMap( "split", source );
 Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight );
 flow.complete();
 validateLength( flow, 1, "left" );
 validateLength( flow, 1, "rightLeft" );
 validateLength( flow, 1, "rightRight" );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSplitMultiple() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 // 46 192
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
 Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
 Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE );
 Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE );
 Pipe head = new Pipe( "split" );
 head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 head = new GroupBy( head, new Fields( "ip" ) );
 head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
 head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
 Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
 Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
 right = new GroupBy( right, new Fields( "ip" ) );
 Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() );
 Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() );
 Map sources = Cascades.tapsMap( "split", source );
 Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight );
 flow.complete();
 validateLength( flow, 1, "left" );
 validateLength( flow, 1, "rightLeft" );
 validateLength( flow, 1, "rightRight" );
 }

相关文章