本文整理了Java中cascading.flow.Flow
类的一些代码示例,展示了Flow
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow
类的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
[英]A Flow is a logical unit of work declared by an assembly of cascading.pipe.Pipe instances connected to source and sink Tap instances.
A Flow is then executed to push the incoming source data through the assembly into one or more sinks.
A Flow sub-class instance may not be instantiated directly in most cases, see sub-classes of FlowConnector class for supported platforms.
Note that cascading.pipe.Pipe assemblies can be reused in multiple Flow instances. They maintain no state regarding the Flow execution. Subsequently, cascading.pipe.Pipe assemblies can be given parameters through its calling Flow so they can be built in a generic fashion.
When a Flow is created, an optimized internal representation is created that is then executed on the underlying execution platform. This is typically done by creating one or more FlowStep instances.
Flows are submitted in order of dependency when used with a cascading.cascade.Cascade. If two or more steps do not share the same dependencies and all can be scheduled simultaneously, the #getSubmitPriority() value determines the order in which all steps will be submitted for execution. The default submit priority is 5.
Use the FlowListener to receive any events on the life-cycle of the Flow as it executes. Any Tap instances owned by the Flow also implementing FlowListener will automatically be added to the set of listeners.
[中]流是由级联程序集声明的逻辑工作单元。管连接到源和汇接头实例的管道实例。
然后执行一个流,将传入的源数据通过程序集推送到一个或多个接收器中。
在大多数情况下,流子类实例可能不会直接实例化,有关支持的平台,请参阅FlowConnector类的子类。
请注意,级联。管可以在多个流实例中重用管道部件。它们不维护关于流执行的状态。随后,级联。管可以通过其调用流为管道部件提供参数,以便以通用方式构建它们。
创建流时,将创建一个优化的内部表示,然后在底层执行平台上执行。这通常通过创建一个或多个FlowStep实例来完成。
当与级联一起使用时,流按依赖关系的顺序提交。大量大量如果两个或多个步骤不共享相同的依赖项,并且可以同时调度所有步骤,#getSubmitPriority()值确定提交所有步骤以供执行的顺序。默认提交优先级为5。
使用FlowListener在流执行时接收流生命周期中的任何事件。同样实现FlowListener的流所拥有的任何Tap实例都将自动添加到侦听器集中。
代码示例来源:origin: twitter/ambrose
/**
* The onStarting event is fired when a Flow instance receives the start() message. A Flow is cut
* down into executing units called stepFlow. A stepFlow contains a stepFlowJob which represents
* the mapreduce job to be submitted to Hadoop. The ambrose graph is constructed from the step
* graph found in flow object.
*
* @param flow the flow.
*/
@Override
@SuppressWarnings("unchecked")
public void onStarting(Flow flow) {
// init flow
List<FlowStep> steps = flow.getFlowSteps();
totalNumberOfJobs = steps.size();
currentFlowId = flow.getID();
Properties props = new Properties();
props.putAll(flow.getConfigAsProperties());
try {
statsWriteService.initWriteService(props);
} catch (IOException e) {
LOG.error("Failed to initialize statsWriteService", e);
}
// convert graph from cascading to ambrose
AmbroseCascadingGraphConverter converter =
new AmbroseCascadingGraphConverter(Flows.getStepGraphFrom(flow), nodesByName);
converter.convert();
AmbroseUtils.sendDagNodeNameMap(statsWriteService, currentFlowId, nodesByName);
}
代码示例来源:origin: LiveRamp/cascading_ext
@Override
public void stop() {
flow.stop();
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testGeneratorAggregator() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) );
pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 8 * 2 * 3, null );
}
代码示例来源:origin: cwensel/cascading
getPlatform().copyFromLocal( inputFileUpper );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Tap sourceUpper = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileUpper );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
final Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, splice );
flow.addListener( listener );
flow.start();
flow.complete();
return System.nanoTime() - startTime;
flow.stop();
代码示例来源:origin: cwensel/cascading
void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
{
getPlatform().copyFromLocal( inputFileIps );
Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.start(); // simple test for start
flow.complete();
validateLength( flow, 17 );
assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testMerge()
{
Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge1" );
Tap source2 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge2" );
Tap sink = new Hfs( new TextLine(), "foo" );
Pipe left = new Each( new Pipe( "left" ), new Fields( "line" ), new RegexFilter( ".*46.*" ) );
Pipe right = new Each( new Pipe( "right" ), new Fields( "line" ), new RegexFilter( ".*192.*" ) );
Pipe merge = new GroupBy( "merge", Pipe.pipes( left, right ), new Fields( "offset" ) );
Map sources = new HashMap();
sources.put( "left", source1 );
sources.put( "right", source2 );
Map sinks = new HashMap();
sinks.put( "merge", sink );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testStartStopRace() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "startstop" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
flow.start();
flow.stop(); // should not fail
}
代码示例来源:origin: cwensel/cascading
@Test
public void testStartWithoutComplete() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
LockingFlowListener listener = new LockingFlowListener();
flow.addListener( listener );
flow.start();
assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testFailOnMissingSuccessFlowListener() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
FlowListener listener = new FailOnMissingSuccessFlowListener();
Hfs source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Hfs success = new Hfs( new TextLine(), getOutputPath( "withsuccess" ), SinkMode.REPLACE );
Hfs without = new Hfs( new TextLine(), getOutputPath( "withoutsuccess" ), SinkMode.REPLACE );
Hfs sink = new Hfs( new TextLine(), getOutputPath( "final" ), SinkMode.REPLACE );
Flow firstFlow = getPlatform().getFlowConnector( getProperties() ).connect( source, success, new Pipe( "lower" ) );
firstFlow.addListener( listener );
firstFlow.complete();
Flow secondFlow = getPlatform().getFlowConnector( getProperties() ).connect( success, without, new Pipe( "lower" ) );
secondFlow.addListener( listener );
secondFlow.complete();
Hfs successTap = new Hfs( new TextLine(), new Path( without.getPath(), "_SUCCESS" ).toString() );
assertTrue( successTap.deleteResource( getPlatform().getFlowProcess() ) );
Flow thirdFlow = getPlatform().getFlowConnector( getProperties() ).connect( without, sink, new Pipe( "lower" ) );
thirdFlow.addListener( listener );
try
{
thirdFlow.complete();
fail( "listener did not fail flow" );
}
catch( FlowException exception )
{
// do nothing
}
}
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
@Test
public void testDupeSourceRepeat()
{
Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
Tap sink = new Hfs( new TextLine(), "foo" );
Pipe pipe = new Pipe( "pipe" );
Pipe merge = new CoGroup( "cogroup", pipe, new Fields( "offset" ), 1, Fields.size( 4 ) );
Map sources = new HashMap();
sources.put( "pipe", source1 );
Map sinks = new HashMap();
sinks.put( "cogroup", sink );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
{
Pipe pipe = new TestAssembly( "test" );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
Tap source = getPlatform().getTextFile( "foo" );
Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testNotLocalMode() throws Exception
{
if( !getPlatform().isUseCluster() )
return;
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) ( (BaseFlowStep) step ).createInitializedConfig( flow.getFlowProcess(), ( (BaseHadoopPlatform) getPlatform() ).getConfiguration() ) );
assertTrue( "is local", !isLocal );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testLocalModeSink() throws Exception
{
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Lfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
assertTrue( "is not local", isLocal );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowWithCounters() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
Map<String, Long> innerMap = new HashMap<String, Long>();
innerMap.put( "inner-key", 42L );
counters.put( "outer-key", innerMap );
Flow process = flowWithCounters( "counter", counters );
process.complete();
FlowStats flowStats = process.getFlowStats();
assertNotNull( flowStats );
List children = new ArrayList( flowStats.getChildren() );
assertEquals( 1, children.size() );
ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
assertEquals( counters.keySet(), stepStats.getCounterGroups() );
assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "completeException";
Flow process = flowWithException( path, FailingRiffle.Failing.COMPLETE );
process.addListener( listener );
try
{
process.complete();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
default Stream<Tuple> getSinkTupleStreamCopy( String name, Fields selector )
{
return TupleStream.tupleStream( getSink( name ), getFlowProcess(), selector );
}
}
代码示例来源:origin: LiveRamp/cascading_ext
@Override
public void complete(JobPersister persister, boolean failOnCounterFetch) {
AtomicBoolean isComplete = new AtomicBoolean(false);
flow.addListener(new StopListener(isComplete));
flow.addStepListener(new JobRecordListener(
persister,
failOnCounterFetch
));
flow.complete();
// TODO kill skipCompleteListener once we figure out the cascading internal NPE (upgrade past 2.5.1 maybe?)
if (!isComplete.get() && !skipCompleteListener) {
throw new RuntimeException("Flow terminated but did not complete! Possible shutdown hook invocation.");
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "stopException";
Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
process.addListener( listener );
process.start();
try
{
process.stop();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: org.springframework.data/spring-cascading
@Override
public void afterPropertiesSet() throws Exception {
flow = createFlow();
if (skipStrategy != null) {
flow.setFlowSkipStrategy(skipStrategy);
}
if (stepStrategy != null) {
flow.setFlowStepStrategy(stepStrategy);
}
if (listeners != null) {
for (FlowListener listener : listeners) {
flow.addListener(listener);
}
}
if (StringUtils.hasText(writeDOT)) {
flow.writeDOT(writeDOT);
}
if (StringUtils.hasText(writeStepsDOT)) {
flow.writeStepsDOT(writeStepsDOT);
}
if (priority != null) {
flow.setSubmitPriority(priority);
}
}
代码示例来源:origin: LiveRamp/cascading_ext
public static List<Counter> getCountersForGroup(Flow flow, String group) throws IOException {
return getCountersForGroup(flow.getFlowStats(), group);
}
内容来源于网络,如有侵权,请联系作者删除!