
x33g5p2x  于2022-01-26 转载在 其他  



[英]Class Pipe is used to name branches in pipe assemblies, and as a base class for core processing model types, specifically Each, Every, GroupBy, CoGroup, Merge, HashJoin, and SubAssembly.

Pipes are chained together through their constructors.

To effect a split in the pipe, simply pass a Pipe instance to two or more constructors of subsequent Pipe instances.

A join can be achieved by passing two or more Pipe instances to a CoGroup or HashJoin pipe.

A merge can be achieved by passing two or more Pipe instances to a GroupBy or Merge pipe.


代码示例来源:origin: cwensel/cascading

public void testGetFirst()
 Pipe pipeFirst = new Pipe( "first" );
 Pipe pipe = new Pipe( pipeFirst );
 pipe = new Pipe( pipe );
 pipe = new Pipe( pipe );
 pipe = new Pipe( pipe );
 assertEquals( pipeFirst, pipe.getHeads()[ 0 ] );

代码示例来源:origin: cwensel/cascading

 * Method connect links the given source and sink Taps to the given pipe assembly.
 * @param name   name to give the resulting Flow
 * @param source source Tap to bind to the head of the given tail Pipe
 * @param sink   sink Tap to bind to the given tail Pipe
 * @param tail   tail end of a pipe assembly
 * @return Flow
public Flow connect( String name, Tap source, Tap sink, Pipe tail )
 Map<String, Tap> sources = new HashMap<String, Tap>();
 sources.put( tail.getHeads()[ 0 ].getName(), source );
 return connect( name, sources, sink, tail );

代码示例来源:origin: cwensel/cascading

 * Constructor Splice creates a new Splice instance.
 * @param lhs            of type Pipe
 * @param lhsGroupFields of type Fields
 * @param rhs            of type Pipe
 * @param rhsGroupFields of type Fields
protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields )
 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) );

代码示例来源:origin: cwensel/cascading

static Pipe resolvePrevious( Pipe pipe )
 if( pipe instanceof Splice || pipe instanceof Operator )
  return pipe;
 Pipe[] pipes = pipe.getPrevious();
 if( pipes.length > 1 )
  throw new IllegalStateException( "cannot resolve SubAssemblies with multiple tails at this time" );
 for( Pipe previous : pipes )
  if( previous instanceof Splice || previous instanceof Operator )
   return previous;
  return resolvePrevious( previous );
 return pipe;

代码示例来源:origin: cwensel/cascading

private static void collectPipes( String name, Pipe[] tails, Set<Pipe> pipes )
 for( Pipe tail : tails )
  if( !( tail instanceof SubAssembly ) && tail.getName().equals( name ) )
   pipes.add( tail );
  collectPipes( name, SubAssembly.unwind( tail.getPrevious() ), pipes );

代码示例来源:origin: cwensel/cascading

public void testBuildMerge()
 Tap sourceLower = getPlatform().getTextFile( "file1" );
 Tap sourceUpper = getPlatform().getTextFile( "file2" );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 sources.put( "upper", sourceUpper );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 Tap sink = getPlatform().getTextFile( "outpath", SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );

代码示例来源:origin: cwensel/cascading

void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
 getPlatform().copyFromLocal( inputFileIps );
 Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
 Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 pipe = new GroupBy( pipe, new Fields( 1 ) );
 pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.start(); // simple test for start
 validateLength( flow, 17 );
 assertTrue( getSinkAsList( flow ).contains( new Tuple( "\t2" ) ) );

代码示例来源:origin: cwensel/cascading

public void testSinkUnknown() throws IOException
 getPlatform().copyFromLocal( inputFileCross );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCross );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third" ), "\\s" ), Fields.RESULTS );
 Tap sink = getPlatform().getTabDelimitedFile( Fields.UNKNOWN, getOutputPath( "unknownsinks" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 validateLength( flow, 37, null );
 TupleEntryIterator iterator = flow.openSink();
 String line =;
 assertTrue( "not equal: wrong values: " + line, line.matches( "[0-9]\t[a-z]\t[A-Z]" ) );

代码示例来源:origin: cascading/cascading-hadoop2-mr1

public void testName()
 Pipe count = new Pipe( "count" );
 Pipe pipe = new GroupBy( count, new Fields( 1 ) );
 pipe = new Every( pipe, new Fields( 1 ), new Count(), new Fields( 0, 1 ) );
 assertEquals( "not equal: count.getName()", "count", count.getName() );
 assertEquals( "not equal: pipe.getName()", "count", pipe.getName() );
 pipe = new Each( count, new Fields( 1 ), new RegexSplitter( Fields.size( 2 ) ) );
 assertEquals( "not equal: pipe.getName()", "count", pipe.getName() );

代码示例来源:origin: cwensel/cascading

private Flow thirdFlow( Tap lhs, Tap rhs )
 Pipe lhsPipe = new Pipe( "lhs" );
 Pipe rhsPipe = new Pipe( "rhs" );
 Pipe pipe = new CoGroup( lhsPipe, new Fields( 0 ), rhsPipe, new Fields( 0 ), Fields.size( 2 ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "third" ), SinkMode.REPLACE );
 return getPlatform().getFlowConnector().connect( Cascades.tapsMap( Pipe.pipes( lhsPipe, rhsPipe ), Tap.taps( lhs, rhs ) ), sink, pipe );

代码示例来源:origin: cwensel/cascading

public void testOneJob5() throws IOException
 Map sources = new HashMap();
 Map sinks = new HashMap();
 sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
 sources.put( "b", new Hfs( new TextLine( new Fields( "third", "fourth" ) ), "input/path/b" ) );
 Pipe pipeA = new Pipe( "a" );
 Pipe pipeB = new Pipe( "b" );
 Pipe splice = new CoGroup( pipeA, pipeB );
 splice = new Each( splice, new Identity() );
 sinks.put( splice.getName(), new Hfs( new TextLine(), "output/path" ) );
 List steps = getPlatform().getFlowConnector().connect( sources, sinks, splice ).getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 BaseFlowStep step = (BaseFlowStep) steps.get( 0 );
 assertEquals( "not equal: step.sources.size()", 2, step.getSourceTaps().size() );
 assertNotNull( "null: step.groupBy", step.getGroup() );
 assertNotNull( "null: step.sink", step.getSink() );
 int mapDist = ElementGraphs.shortestDistance( step.getElementGraph(), (FlowElement) step.getSourceTaps().iterator().next(), step.getGroup() );
 assertEquals( "not equal: mapDist", 1, mapDist );
 int reduceDist = ElementGraphs.shortestDistance( step.getElementGraph(), step.getGroup(), step.getSink() );
 assertEquals( "not equal: reduceDist", 2, reduceDist );

代码示例来源:origin: cascading/cascading-bind

private String[] makeNames( Pipe[] pipes )
 String[] names = new String[ pipes.length ];
 for( int i = 0; i < pipes.length; i++ )
  names[ i ] = pipes[ i ].getName();
 return names;

代码示例来源:origin: cwensel/cascading

 public void testNestedAssembliesAccessors() throws IOException
  Pipe pipe = new Pipe( "test" );

  pipe = new SecondAssembly( pipe );

  Pipe[] allPrevious = pipe.getPrevious();

  assertEquals( "wrong number of previous", 1, allPrevious.length );

//    for( Pipe previous : allPrevious )
//      assertFalse( previous instanceof PipeAssembly );

  Pipe[] heads = pipe.getHeads();

  assertEquals( "wrong number of heads", 1, heads.length );

  for( Pipe head : heads )
   assertFalse( head instanceof SubAssembly );


代码示例来源:origin: cwensel/cascading

private static void collectNames( Pipe[] pipes, Set<String> names )
 for( Pipe pipe : pipes )
  if( pipe instanceof SubAssembly )
   names.addAll( asList( ( (SubAssembly) pipe ).getTailNames() ) );
   names.add( pipe.getName() );
  collectNames( SubAssembly.unwind( pipe.getPrevious() ), names );

代码示例来源:origin: cwensel/cascading

 * Creates a new GroupBy instance that will first merge the given pipes, then group on the given groupFields field names.
 * @param groupName   of type String
 * @param lhsPipe     of type Pipe
 * @param rhsPipe     of type Pipe
 * @param groupFields of type Fields
public GroupBy( String groupName, Pipe lhsPipe, Pipe rhsPipe, Fields groupFields )
 super( groupName, Pipe.pipes( lhsPipe, rhsPipe ), groupFields );

代码示例来源:origin: cwensel/cascading

 * Method getHeads returns the first Pipe instances in this pipe assembly.
 * @return the first (type Pipe[]) of this Pipe object.
public Pipe[] getHeads()
 Pipe[] pipes = getPrevious();
 if( pipes.length == 0 )
  return new Pipe[]{this};
 if( pipes.length == 1 )
  return pipes[ 0 ].getHeads();
 Set<Pipe> heads = new HashSet<Pipe>();
 for( Pipe pipe : pipes )
  Collections.addAll( heads, pipe.getHeads() );
 return heads.toArray( new Pipe[ heads.size() ] );

代码示例来源:origin: cascading/cascading-bind

protected Map<String, Tap> getSourceTapsMap( Pipe... sinkPipes )
 Set<Pipe> sourcePipesSet = new HashSet<Pipe>();
 for( Pipe pipe : sinkPipes )
  Collections.addAll( sourcePipesSet, pipe.getHeads() );
 Pipe[] sourcePipes = sourcePipesSet.toArray( new Pipe[ sourcePipesSet.size() ] );
 String[] names = makeNames( sourcePipes );
 return Cascades.tapsMap( sourcePipes, getSourceTapsFor( names ) );

代码示例来源:origin: cwensel/cascading

public static Integer findOrdinal( Pipe pipe, Pipe previous )
 Pipe[] previousPipes = pipe.getPrevious();
 TreeMap<Integer, Integer> sorted = new TreeMap<>();
 for( int i = 0; i < previousPipes.length; i++ )
  int result = isPrevious( previousPipes[ i ], (Pipe) previous );
  if( result == -1 )
  sorted.put( result, i );
 return sorted.firstEntry().getValue();

代码示例来源:origin: cascading/cascading-platform

public void testBuildMerge()
 Tap sourceLower = getPlatform().getTextFile( "file1" );
 Tap sourceUpper = getPlatform().getTextFile( "file2" );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 sources.put( "upper", sourceUpper );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 Tap sink = getPlatform().getTextFile( "outpath", SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );

代码示例来源:origin: cascading/cascading-platform

void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
 getPlatform().copyFromLocal( inputFileIps );
 Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
 Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 pipe = new GroupBy( pipe, new Fields( 1 ) );
 pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.start(); // simple test for start
 validateLength( flow, 17 );
 assertTrue( getSinkAsList( flow ).contains( new Tuple( "\t2" ) ) );
