[英]A Flow is a logical unit of work declared by an assembly of cascading.pipe.Pipe instances connected to source and sink Tap instances.
A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of FlowConnector class for supported platforms.
Note that cascading.pipe.Pipe assemblies can be reused in multiple Flow instances. They maintain no state regarding the Flow execution. Subsequently, cascading.pipe.Pipe assemblies can be given parameters through its calling Flow so they can be built in a generic fashion.
When a Flow is created, an optimized internal representation is created that is then executed on the underlying execution platform. This is typically done by creating one or more FlowStep instances.
Flows are submitted in order of dependency when used with a cascading.cascade.Cascade. If two or more steps do not share the same dependencies and all can be scheduled simultaneously, the #getSubmitPriority() value determines the order in which all steps will be submitted for execution. The default submit priority is 5.
Use the FlowListener to receive any events on the life-cycle of the Flow as it executes. Any Tap instances owned by the Flow also implementing FlowListener will automatically be added to the set of listeners.
代码示例来源:origin: twitter/ambrose
* The onStarting event is fired when a Flow instance receives the start() message. A Flow is cut
* down into executing units called stepFlow. A stepFlow contains a stepFlowJob which represents
* the mapreduce job to be submitted to Hadoop. The ambrose graph is constructed from the step
* graph found in flow object.
* @param flow the flow.
public void onStarting(Flow flow) {
// init flow
List<FlowStep> steps = flow.getFlowSteps();
totalNumberOfJobs = steps.size();
currentFlowId = flow.getID();
Properties props = new Properties();
try {
} catch (IOException e) {
LOG.error("Failed to initialize statsWriteService", e);
// convert graph from cascading to ambrose
AmbroseCascadingGraphConverter converter =
new AmbroseCascadingGraphConverter(Flows.getStepGraphFrom(flow), nodesByName);
AmbroseUtils.sendDagNodeNameMap(statsWriteService, currentFlowId, nodesByName);
代码示例来源:origin: LiveRamp/cascading_ext
public void stop() {
代码示例来源:origin: cwensel/cascading
public void testGeneratorAggregator() throws Exception
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
validateLength( flow, 8 * 2 * 3, null );
代码示例来源:origin: cwensel/cascading
getPlatform().copyFromLocal( inputFileUpper );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Tap sourceUpper = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileUpper );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
final Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, splice );
flow.addListener( listener );
return System.nanoTime() - startTime;
代码示例来源:origin: cwensel/cascading
void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
getPlatform().copyFromLocal( inputFileIps );
Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.start(); // simple test for start
validateLength( flow, 17 );
assertTrue( getSinkAsList( flow ).contains( new Tuple( "\t2" ) ) );
代码示例来源:origin: cwensel/cascading
public void testMerge()
Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge1" );
Tap source2 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge2" );
Tap sink = new Hfs( new TextLine(), "foo" );
Pipe left = new Each( new Pipe( "left" ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
Pipe right = new Each( new Pipe( "right" ), new Fields( "line" ), new RegexFilter( ".*192.*" ) );
Pipe merge = new GroupBy( "merge", Pipe.pipes( left, right ), new Fields( "offset" ) );
Map sources = new HashMap();
sources.put( "left", source1 );
sources.put( "right", source2 );
Map sinks = new HashMap();
sinks.put( "merge", sink );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
代码示例来源:origin: cwensel/cascading
public void testStartStopRace() throws Exception
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "startstop" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
flow.stop(); // should not fail
代码示例来源:origin: cwensel/cascading
public void testStartWithoutComplete() throws Exception
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
LockingFlowListener listener = new LockingFlowListener();
flow.addListener( listener );
assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
代码示例来源:origin: cwensel/cascading
public void testFailOnMissingSuccessFlowListener() throws Exception
getPlatform().copyFromLocal( inputFileLower );
FlowListener listener = new FailOnMissingSuccessFlowListener();
Hfs source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Hfs success = new Hfs( new TextLine(), getOutputPath( "withsuccess" ), SinkMode.REPLACE );
Hfs without = new Hfs( new TextLine(), getOutputPath( "withoutsuccess" ), SinkMode.REPLACE );
Hfs sink = new Hfs( new TextLine(), getOutputPath( "final" ), SinkMode.REPLACE );
Flow firstFlow = getPlatform().getFlowConnector( getProperties() ).connect( source, success, new Pipe( "lower" ) );
firstFlow.addListener( listener );
Flow secondFlow = getPlatform().getFlowConnector( getProperties() ).connect( success, without, new Pipe( "lower" ) );
secondFlow.addListener( listener );
Hfs successTap = new Hfs( new TextLine(), new Path( without.getPath(), "_SUCCESS" ).toString() );
assertTrue( successTap.deleteResource( getPlatform().getFlowProcess() ) );
Flow thirdFlow = getPlatform().getFlowConnector( getProperties() ).connect( without, sink, new Pipe( "lower" ) );
thirdFlow.addListener( listener );
fail( "listener did not fail flow" );
catch( FlowException exception )
// do nothing
代码示例来源:origin: cascading/cascading-hadoop2-mr1
public void testDupeSourceRepeat()
Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
Tap sink = new Hfs( new TextLine(), "foo" );
Pipe pipe = new Pipe( "pipe" );
Pipe merge = new CoGroup( "cogroup", pipe, new Fields( "offset" ), 1, Fields.size( 4 ) );
Map sources = new HashMap();
sources.put( "pipe", source1 );
Map sinks = new HashMap();
sinks.put( "cogroup", sink );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
代码示例来源:origin: cwensel/cascading
/** Tests that proper pipe graph is assembled without throwing an internal error */
public void testPipeAssembly()
Pipe pipe = new TestAssembly( "test" );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
Tap source = getPlatform().getTextFile( "foo" );
Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
代码示例来源:origin: cwensel/cascading
public void testNotLocalMode() throws Exception
if( !getPlatform().isUseCluster() )
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) ( (BaseFlowStep) step ).createInitializedConfig( flow.getFlowProcess(), ( (BaseHadoopPlatform) getPlatform() ).getConfiguration() ) );
assertTrue( "is local", !isLocal );
代码示例来源:origin: cwensel/cascading
public void testLocalModeSink() throws Exception
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Lfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
assertTrue( "is not local", isLocal );
代码示例来源:origin: cwensel/cascading
public void testProcessFlowWithCounters() throws IOException
getPlatform().copyFromLocal( inputFileIps );
Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
Map<String, Long> innerMap = new HashMap<String, Long>();
innerMap.put( "inner-key", 42L );
counters.put( "outer-key", innerMap );
Flow process = flowWithCounters( "counter", counters );
FlowStats flowStats = process.getFlowStats();
assertNotNull( flowStats );
List children = new ArrayList( flowStats.getChildren() );
assertEquals( 1, children.size() );
ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
assertEquals( counters.keySet(), stepStats.getCounterGroups() );
assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
代码示例来源:origin: cwensel/cascading
public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "completeException";
Flow process = flowWithException( path, FailingRiffle.Failing.COMPLETE );
process.addListener( listener );
fail( "there should have been an exception" );
catch( CascadingException exception )
assertNotNull( listener.getThrowable() );
代码示例来源:origin: cwensel/cascading
default Stream<Tuple> getSinkTupleStreamCopy( String name, Fields selector )
return TupleStream.tupleStream( getSink( name ), getFlowProcess(), selector );
代码示例来源:origin: LiveRamp/cascading_ext
public void complete(JobPersister persister, boolean failOnCounterFetch) {
AtomicBoolean isComplete = new AtomicBoolean(false);
flow.addListener(new StopListener(isComplete));
flow.addStepListener(new JobRecordListener(
// TODO kill skipCompleteListener once we figure out the cascading internal NPE (upgrade past 2.5.1 maybe?)
if (!isComplete.get() && !skipCompleteListener) {
throw new RuntimeException("Flow terminated but did not complete! Possible shutdown hook invocation.");
代码示例来源:origin: cwensel/cascading
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "stopException";
Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
process.addListener( listener );
fail( "there should have been an exception" );
catch( CascadingException exception )
assertNotNull( listener.getThrowable() );
代码示例来源:origin: org.springframework.data/spring-cascading
public void afterPropertiesSet() throws Exception {
flow = createFlow();
if (skipStrategy != null) {
if (stepStrategy != null) {
if (listeners != null) {
for (FlowListener listener : listeners) {
if (StringUtils.hasText(writeDOT)) {
if (StringUtils.hasText(writeStepsDOT)) {
if (priority != null) {
代码示例来源:origin: LiveRamp/cascading_ext
public static List<Counter> getCountersForGroup(Flow flow, String group) throws IOException {
return getCountersForGroup(flow.getFlowStats(), group);