本文整理了Java中cascading.pipe.Pipe
类的一些代码示例,展示了Pipe
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipe
类的具体详情如下:
包路径:cascading.pipe.Pipe
类名称:Pipe
[英]Class Pipe is used to name branches in pipe assemblies, and as a base class for core processing model types, specifically Each, Every, GroupBy, CoGroup, Merge, HashJoin, and SubAssembly.
Pipes are chained together through their constructors.
To effect a split in the pipe, simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.
A join can be achieved by passing two or more Pipe instances to a CoGroup or HashJoin pipe.
A merge can be achieved by passing two or more Pipe instances to a GroupBy or Merge pipe.
[中]类Pipe用于命名管道部件中的分支,并作为核心处理模型类型的基类,特别是每个、每个、GroupBy、CoGroup、合并、HashJoin和SubAssembly。
管道通过施工人员连接在一起。
要在管道中进行拆分,只需将管道实例传递给后续管道实例的两个或多个构造函数。
连接可以通过将两个或多个管道实例传递给CoGroup或HashJoin管道来实现。
可以通过将两个或多个管道实例传递给GroupBy或merge管道来实现合并。
代码示例来源:origin: cwensel/cascading
@Test
public void testGetFirst()
{
Pipe pipeFirst = new Pipe( "first" );
Pipe pipe = new Pipe( pipeFirst );
pipe = new Pipe( pipe );
pipe = new Pipe( pipe );
pipe = new Pipe( pipe );
assertEquals( pipeFirst, pipe.getHeads()[ 0 ] );
}
代码示例来源:origin: cwensel/cascading
/**
* Method connect links the given source and sink Taps to the given pipe assembly.
*
* @param name name to give the resulting Flow
* @param source source Tap to bind to the head of the given tail Pipe
* @param sink sink Tap to bind to the given tail Pipe
* @param tail tail end of a pipe assembly
* @return Flow
*/
public Flow connect( String name, Tap source, Tap sink, Pipe tail )
{
Map<String, Tap> sources = new HashMap<String, Tap>();
sources.put( tail.getHeads()[ 0 ].getName(), source );
return connect( name, sources, sink, tail );
}
代码示例来源:origin: cwensel/cascading
/**
* Constructor Splice creates a new Splice instance.
*
* @param lhs of type Pipe
* @param lhsGroupFields of type Fields
* @param rhs of type Pipe
* @param rhsGroupFields of type Fields
*/
protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields )
{
this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) );
}
代码示例来源:origin: cwensel/cascading
static Pipe resolvePrevious( Pipe pipe )
{
if( pipe instanceof Splice || pipe instanceof Operator )
return pipe;
Pipe[] pipes = pipe.getPrevious();
if( pipes.length > 1 )
throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
for( Pipe previous : pipes )
{
if( previous instanceof Splice || previous instanceof Operator )
return previous;
return resolvePrevious( previous );
}
return pipe;
}
代码示例来源:origin: cwensel/cascading
private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
{
for( Pipe tail : tails )
{
if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
pipes.add( tail );
collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testBuildMerge()
{
Tap sourceLower = getPlatform().getTextFile( "file1" );
Tap sourceUpper = getPlatform().getTextFile( "file2" );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Tap sink = getPlatform().getTextFile( "outpath", SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
}
代码示例来源:origin: cwensel/cascading
void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
{
getPlatform().copyFromLocal( inputFileIps );
Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.start(); // simple test for start
flow.complete();
validateLength( flow, 17 );
assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSinkUnknown() throws IOException
{
getPlatform().copyFromLocal( inputFileCross );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 37, null );
TupleEntryIterator iterator = flow.openSink();
String line = iterator.next().getTuple().toString();
assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );
iterator.close();
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
@Test
public void testName()
{
Pipe count = new Pipe( "count" );
Pipe pipe = new GroupBy( count, new Fields( 1 ) );
pipe = new Every( pipe, new Fields( 1 ), new Count(), new Fields( 0, 1 ) );
assertEquals( "not equal: count.getName()", "count", count.getName() );
assertEquals( "not equal: pipe.getName()", "count", pipe.getName() );
pipe = new Each( count, new Fields( 1 ), new RegexSplitter( Fields.size( 2 ) ) );
assertEquals( "not equal: pipe.getName()", "count", pipe.getName() );
}
代码示例来源:origin: cwensel/cascading
private Flow thirdFlow( Tap lhs, Tap rhs )
{
Pipe lhsPipe = new Pipe( "lhs" );
Pipe rhsPipe = new Pipe( "rhs" );
Pipe pipe = new CoGroup( lhsPipe, new Fields( 0 ), rhsPipe, new Fields( 0 ), Fields.size( 2 ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "third" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) ), sink, pipe );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testOneJob5() throws IOException
{
Map sources = new HashMap();
Map sinks = new HashMap();
sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
sources.put( "b", new Hfs( new TextLine( new Fields( "third", "fourth" ) ), "input/path/b" ) );
Pipe pipeA = new Pipe( "a" );
Pipe pipeB = new Pipe( "b" );
Pipe splice = new CoGroup( pipeA, pipeB );
splice = new Each( splice, new Identity() );
sinks.put( splice.getName(), new Hfs( new TextLine(), "output/path" ) );
List steps = getPlatform().getFlowConnector().connect( sources, sinks, splice ).getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
BaseFlowStep step = (BaseFlowStep) steps.get( 0 );
assertEquals( "not equal: step.sources.size()", 2, step.getSourceTaps().size() );
assertNotNull( "null: step.groupBy", step.getGroup() );
assertNotNull( "null: step.sink", step.getSink() );
int mapDist = ElementGraphs.shortestDistance( step.getElementGraph(), (FlowElement) step.getSourceTaps().iterator().next(), step.getGroup() );
assertEquals( "not equal: mapDist", 1, mapDist );
int reduceDist = ElementGraphs.shortestDistance( step.getElementGraph(), step.getGroup(), step.getSink() );
assertEquals( "not equal: reduceDist", 2, reduceDist );
}
代码示例来源:origin: cascading/cascading-bind
private String[] makeNames( Pipe[] pipes )
{
String[] names = new String[ pipes.length ];
for( int i = 0; i < pipes.length; i++ )
names[ i ] = pipes[ i ].getName();
return names;
}
代码示例来源:origin: cwensel/cascading
@Test
public void testNestedAssembliesAccessors() throws IOException
{
Pipe pipe = new Pipe( "test" );
pipe = new SecondAssembly( pipe );
Pipe[] allPrevious = pipe.getPrevious();
assertEquals( "wrong number of previous", 1, allPrevious.length );
// for( Pipe previous : allPrevious )
// assertFalse( previous instanceof PipeAssembly );
Pipe[] heads = pipe.getHeads();
assertEquals( "wrong number of heads", 1, heads.length );
for( Pipe head : heads )
assertFalse( head instanceof SubAssembly );
}
代码示例来源:origin: cwensel/cascading
private static void collectNames( Pipe[] pipes, Set<String> names )
{
for( Pipe pipe : pipes )
{
if( pipe instanceof SubAssembly )
names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
else
names.add( pipe.getName() );
collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );
}
}
代码示例来源:origin: cwensel/cascading
/**
* Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
*
* @param groupName of type String
* @param lhsPipe of type Pipe
* @param rhsPipe of type Pipe
* @param groupFields of type Fields
*/
public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
{
super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields );
}
代码示例来源:origin: cwensel/cascading
/**
* Method getHeads returns the first Pipe instances in this pipe assembly.
*
* @return the first (type Pipe[]) of this Pipe object.
*/
public Pipe[] getHeads()
{
Pipe[] pipes = getPrevious();
if( pipes.length == 0 )
return new Pipe[]{this};
if( pipes.length == 1 )
return pipes[ 0 ].getHeads();
Set<Pipe> heads = new HashSet<Pipe>();
for( Pipe pipe : pipes )
Collections.addAll( heads, pipe.getHeads() );
return heads.toArray( new Pipe[ heads.size() ] );
}
代码示例来源:origin: cascading/cascading-bind
protected Map<String, Tap> getSourceTapsMap( Pipe... sinkPipes )
{
Set<Pipe> sourcePipesSet = new HashSet<Pipe>();
for( Pipe pipe : sinkPipes )
Collections.addAll( sourcePipesSet, pipe.getHeads() );
Pipe[] sourcePipes = sourcePipesSet.toArray( new Pipe[ sourcePipesSet.size() ] );
String[] names = makeNames( sourcePipes );
return Cascades.tapsMap( sourcePipes, getSourceTapsFor( names ) );
}
代码示例来源:origin: cwensel/cascading
public static Integer findOrdinal( Pipe pipe, Pipe previous )
{
Pipe[] previousPipes = pipe.getPrevious();
TreeMap<Integer, Integer> sorted = new TreeMap<>();
for( int i = 0; i < previousPipes.length; i++ )
{
int result = isPrevious( previousPipes[ i ], (Pipe) previous );
if( result == -1 )
continue;
sorted.put( result, i );
}
return sorted.firstEntry().getValue();
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testBuildMerge()
{
Tap sourceLower = getPlatform().getTextFile( "file1" );
Tap sourceUpper = getPlatform().getTextFile( "file2" );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Tap sink = getPlatform().getTextFile( "outpath", SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
}
代码示例来源:origin: cascading/cascading-platform
void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
{
getPlatform().copyFromLocal( inputFileIps );
Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.start(); // simple test for start
flow.complete();
validateLength( flow, 17 );
assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
}
内容来源于网络,如有侵权,请联系作者删除!