本文整理了Java中cascading.tap.Tap.taps()
方法的一些代码示例,展示了Tap.taps()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.taps()
方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:taps
[英]Convenience function to make an array of Tap instances.
[中]方便功能,用于创建一个Tap实例数组。
代码示例来源:origin: cwensel/cascading
/**
* Method tapsMap creates a new Map for the given name and tap.
*
* @param name of type String
* @param tap of type Tap
* @return Map
*/
public static Map<String, Tap> tapsMap( String name, Tap tap )
{
return tapsMap( new String[]{name}, Tap.taps( tap ) );
}
代码示例来源:origin: cwensel/cascading
/**
* Method tapsMap creates a new Map using the given Pipe name and tap.
*
* @param pipe of type Pipe
* @param tap of type Tap
* @return Map
*/
public static Map<String, Tap> tapsMap( Pipe pipe, Tap tap )
{
return tapsMap( Pipe.pipes( pipe ), Tap.taps( tap ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testMultiTapCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "multitap";
Flow first = previousMultiTapFlow( path, "first" );
Flow second = previousMultiTapFlow( path, "second" );
Flow multitap = multiTapFlow( Tap.taps( first.getSink(), second.getSink() ), path );
Cascade cascade = new CascadeConnector( getProperties() ).connect( multitap, first, second );
cascade.start();
cascade.complete();
validateLength( multitap, 40 );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testMultiTapCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "multitap";
Flow first = previousMultiTapFlow( path, "first" );
Flow second = previousMultiTapFlow( path, "second" );
Flow multitap = multiTapFlow( Tap.taps( first.getSink(), second.getSink() ), path );
Cascade cascade = new CascadeConnector( getProperties() ).connect( multitap, first, second );
cascade.start();
cascade.complete();
validateLength( multitap, 40 );
}
代码示例来源:origin: cwensel/cascading
private Flow thirdFlow( Tap lhs, Tap rhs )
{
Pipe lhsPipe = new Pipe( "lhs" );
Pipe rhsPipe = new Pipe( "rhs" );
Pipe pipe = new CoGroup( lhsPipe, new Fields( 0 ), rhsPipe, new Fields( 0 ), Fields.size( 2 ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "third" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) ), sink, pipe );
}
代码示例来源:origin: cascading/cascading-platform
private Flow thirdFlow( Tap lhs, Tap rhs )
{
Pipe lhsPipe = new Pipe( "lhs" );
Pipe rhsPipe = new Pipe( "rhs" );
Pipe pipe = new CoGroup( lhsPipe, new Fields( 0 ), rhsPipe, new Fields( 0 ), Fields.size( 2 ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "third" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) ), sink, pipe );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testUniqueMerge() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
getPlatform().copyFromLocal( inputFileRhs );
Tap sourceLhs = getPlatform().getTextFile( inputFileLhs );
Tap sourceRhs = getPlatform().getTextFile( inputFileRhs );
Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "num", "char" ), getOutputPath( "uniquemerge-nondeterministic" ), SinkMode.REPLACE );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Pipe lhsPipe = new Pipe( "lhs" );
lhsPipe = new Each( lhsPipe, new Fields( "line" ), splitter );
Pipe rhsPipe = new Pipe( "rhs" );
rhsPipe = new Each( rhsPipe, new Fields( "line" ), splitter );
Pipe pipe = new Unique( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "num" ) );
Map<String, Tap> sources = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( sourceLhs, sourceRhs ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
flow.complete();
validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testUniqueMerge() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
getPlatform().copyFromLocal( inputFileRhs );
Tap sourceLhs = getPlatform().getTextFile( inputFileLhs );
Tap sourceRhs = getPlatform().getTextFile( inputFileRhs );
Tap sink = getPlatform().getTextFile( new Fields( "item" ), new Fields( "num", "char" ), getOutputPath( "uniquemerge-nondeterministic" ), SinkMode.REPLACE );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Pipe lhsPipe = new Pipe( "lhs" );
lhsPipe = new Each( lhsPipe, new Fields( "line" ), splitter );
Pipe rhsPipe = new Pipe( "rhs" );
rhsPipe = new Each( rhsPipe, new Fields( "line" ), splitter );
Pipe pipe = new Unique( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "num" ) );
Map<String, Tap> sources = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( sourceLhs, sourceRhs ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe );
flow.complete();
validateLength( flow, 5, 1, Pattern.compile( "^\\d+\\s\\w+$" ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testDupeTailNames() throws Exception
{
Tap source = getPlatform().getTextFile( inputFileJoined );
Tap sink = getPlatform().getTextFile( getOutputPath( "unknown" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( " " ) );
Pipe group = new GroupBy( pipe, Fields.size( 3 ) );
Pipe lhs = new Pipe( "tail", group );
lhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
Pipe rhs = new Pipe( "tail", group );
rhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( lhs, rhs ), Tap.taps( sink, sink ) );
try
{
getPlatform().getFlowConnector().connect( source, sinks, Pipe.pipes( lhs, rhs ) );
fail( "did not fail on dupe head names" );
}
catch( Exception exception )
{
// ignore
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testCountMerge() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
getPlatform().copyFromLocal( inputFileRhs );
Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "count" ), "\t",
new Class[]{String.class, Integer.TYPE}, getOutputPath( "mergecount" ), SinkMode.REPLACE );
Pipe lhsPipe = new Pipe( "count-lhs" );
Pipe rhsPipe = new Pipe( "count-rhs" );
rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
Pipe countPipe = new CountBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "count" ), 2 );
Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, countPipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", 4 ),
new Tuple( "b", 8 ),
new Tuple( "c", 8 ),
new Tuple( "d", 4 ),
new Tuple( "e", 2 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testCountMerge() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
getPlatform().copyFromLocal( inputFileRhs );
Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "count" ), "\t",
new Class[]{String.class, Integer.TYPE}, getOutputPath( "mergecount" ), SinkMode.REPLACE );
Pipe lhsPipe = new Pipe( "count-lhs" );
Pipe rhsPipe = new Pipe( "count-rhs" );
rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
Pipe countPipe = new CountBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "count" ), 2 );
Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, countPipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", 4 ),
new Tuple( "b", 8 ),
new Tuple( "c", 8 ),
new Tuple( "d", 4 ),
new Tuple( "e", 2 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testDupeTailNames() throws Exception
{
Tap source = getPlatform().getTextFile( inputFileJoined );
Tap sink = getPlatform().getTextFile( getOutputPath( "unknown" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( " " ) );
Pipe group = new GroupBy( pipe, Fields.size( 3 ) );
Pipe lhs = new Pipe( "tail", group );
lhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
Pipe rhs = new Pipe( "tail", group );
rhs = new Each( group, new Fields( "line" ), new RegexSplitter( " " ) );
Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( lhs, rhs ), Tap.taps( sink, sink ) );
try
{
getPlatform().getFlowConnector().connect( source, sinks, Pipe.pipes( lhs, rhs ) );
fail( "did not fail on dupe head names" );
}
catch( Exception exception )
{
// ignore
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSumMerge() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
getPlatform().copyFromLocal( inputFileRhs );
Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "sum" ), "\t",
new Class[]{String.class, Integer.TYPE}, getOutputPath( "mergesum" ), SinkMode.REPLACE );
Pipe lhsPipe = new Pipe( "sum-lhs" );
Pipe rhsPipe = new Pipe( "sum-rhs" );
rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
Pipe sumPipe = new SumBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "num" ), new Fields( "sum" ), long.class, 2 );
Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, sumPipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s\\d+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", 12 ),
new Tuple( "b", 24 ),
new Tuple( "c", 20 ),
new Tuple( "d", 12 ),
new Tuple( "e", 10 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testAverageMerge() throws IOException
{
getPlatform().copyFromLocal( inputFileLhs );
getPlatform().copyFromLocal( inputFileRhs );
Tap lhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLhs );
Tap rhs = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileRhs );
Tap sink = getPlatform().getDelimitedFile( new Fields( "char", "average" ), "\t",
new Class[]{String.class, Double.TYPE}, getOutputPath( "mergeaverage" ), SinkMode.REPLACE );
Pipe lhsPipe = new Pipe( "average-lhs" );
Pipe rhsPipe = new Pipe( "average-rhs" );
rhsPipe = new Each( rhsPipe, new Fields( "char" ), new ExpressionFunction( Fields.ARGS, "$0.toLowerCase()", String.class ), Fields.REPLACE );
Pipe sumPipe = new AverageBy( Pipe.pipes( lhsPipe, rhsPipe ), new Fields( "char" ), new Fields( "num" ), new Fields( "average" ), 2 );
Map<String, Tap> tapMap = Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) );
Flow flow = getPlatform().getFlowConnector().connect( tapMap, sink, sumPipe );
flow.complete();
validateLength( flow, 5, 2, Pattern.compile( "^\\w+\\s[\\d.]+$" ) );
Tuple[] results = new Tuple[]{
new Tuple( "a", (double) 12 / 4 ),
new Tuple( "b", (double) 24 / 8 ),
new Tuple( "c", (double) 20 / 8 ),
new Tuple( "d", (double) 12 / 4 ),
new Tuple( "e", (double) 10 / 2 ),
};
TupleEntryIterator iterator = flow.openSink();
int count = 0;
while( iterator.hasNext() )
assertEquals( results[ count++ ], iterator.next().getTuple() );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
/**
* If the sinks have the same scheme as a temp tap, replace the temp tap
*
* @throws Exception
*/
@Test
public void testChainedTaps() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Each( new Pipe( "first" ), new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Each( new Pipe( "second", pipe ), new Fields( "ip" ), new RegexFilter( "7" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Each( new Pipe( "third", pipe ), new Fields( "ip" ), new RegexFilter( "6" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
Tap sinkFirst = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/first" ), SinkMode.REPLACE );
Tap sinkSecond = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/second" ), SinkMode.REPLACE );
Tap sinkThird = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/third" ), SinkMode.REPLACE );
Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"first", "second",
"third"}, Tap.taps( sinkFirst, sinkSecond, sinkThird ) );
FlowConnector flowConnector = getPlatform().getFlowConnector();
Flow flow = flowConnector.connect( source, sinks, pipe );
if( getPlatform().isMapReduce() )
assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );
flow.complete();
validateLength( flow, 3 );
}
}
代码示例来源:origin: cwensel/cascading
Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) );
Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) );
代码示例来源:origin: cwensel/cascading
@Test
public void testSplitComplex() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
// 46 192
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "split" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
Map sources = Cascades.tapsMap( "split", source );
Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
flow.complete();
validateLength( flow, 1, "left" );
validateLength( flow, 1, "right" );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSplitComplex() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
// 46 192
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE );
Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "split" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
Map sources = Cascades.tapsMap( "split", source );
Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right );
flow.complete();
validateLength( flow, 1, "left" );
validateLength( flow, 1, "right" );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSplitMultiple() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
// 46 192
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE );
Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE );
Pipe head = new Pipe( "split" );
head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
head = new GroupBy( head, new Fields( "ip" ) );
head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
right = new GroupBy( right, new Fields( "ip" ) );
Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() );
Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() );
Map sources = Cascades.tapsMap( "split", source );
Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight );
flow.complete();
validateLength( flow, 1, "left" );
validateLength( flow, 1, "rightLeft" );
validateLength( flow, 1, "rightRight" );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSplitMultiple() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
// 46 192
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Tap sinkLeft = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE );
Tap sinkRightLeft = getPlatform().getTextFile( getOutputPath( "rightleft" ), SinkMode.REPLACE );
Tap sinkRightRight = getPlatform().getTextFile( getOutputPath( "rightright" ), SinkMode.REPLACE );
Pipe head = new Pipe( "split" );
head = new Each( head, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
head = new GroupBy( head, new Fields( "ip" ) );
head = new Every( head, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) );
head = new Each( head, new Fields( "ip" ), new RegexFilter( "^68.*" ) );
Pipe left = new Each( new Pipe( "left", head ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) );
Pipe right = new Each( new Pipe( "right", head ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) );
right = new GroupBy( right, new Fields( "ip" ) );
Pipe rightLeft = new Each( new Pipe( "rightLeft", right ), new Fields( "ip" ), new Identity() );
Pipe rightRight = new Each( new Pipe( "rightRight", right ), new Fields( "ip" ), new Identity() );
Map sources = Cascades.tapsMap( "split", source );
Map sinks = Cascades.tapsMap( Pipe.pipes( left, rightLeft, rightRight ), Tap.taps( sinkLeft, sinkRightLeft, sinkRightRight ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, rightLeft, rightRight );
flow.complete();
validateLength( flow, 1, "left" );
validateLength( flow, 1, "rightLeft" );
validateLength( flow, 1, "rightRight" );
}
内容来源于网络,如有侵权,请联系作者删除!