本文整理了Java中cascading.flow.Flow.start()
方法的一些代码示例,展示了Flow.start()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.start()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:start
[英]Method start begins the execution of this Flow instance. It will return immediately. Use the method #complete()to block until this Flow completes.
[中]方法start开始执行此流实例。它将立即返回。使用方法#complete()阻塞,直到此流完成。
代码示例来源:origin: com.twitter/scalding-core_2.10
public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
final Promise<T> result = Promise$.MODULE$.<T>apply();
flow.addListener(new FlowListener() {
public void onStarting(Flow f) { } // ignore
public void onStopping(Flow f) { } // ignore
public void onCompleted(Flow f) {
// This is always called, but onThrowable is called first
if(!result.isCompleted()) {
// we use the above rather than trySuccess to avoid calling fn twice
try {
T toPut = (T) fn.apply(f);
result.success(toPut);
}
catch(Throwable t) {
result.failure(t);
}
}
}
public boolean onThrowable(Flow f, Throwable t) {
result.failure(t);
// The exception is handled by the owner of the promise and should not be rethrown
return true;
}
});
flow.start();
return result.future();
}
}
代码示例来源:origin: com.twitter/scalding-core
public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
final Promise<T> result = Promise$.MODULE$.<T>apply();
flow.addListener(new FlowListener() {
public void onStarting(Flow f) { } // ignore
public void onStopping(Flow f) { } // ignore
public void onCompleted(Flow f) {
// This is always called, but onThrowable is called first
if(!result.isCompleted()) {
// we use the above rather than trySuccess to avoid calling fn twice
try {
T toPut = (T) fn.apply(f);
result.success(toPut);
}
catch(Throwable t) {
result.failure(t);
}
}
}
public boolean onThrowable(Flow f, Throwable t) {
result.failure(t);
// The exception is handled by the owner of the promise and should not be rethrown
return true;
}
});
flow.start();
return result.future();
}
}
代码示例来源:origin: stackoverflow.com
public class EventSystemDemo extends Application {
@Override
public void start(Stage primaryStage) throws Exception {
HBox box = new HBox();
box.setSpacing(12);
box.setPadding(new Insets(12));
box.setFillHeight(true);
box.setAlignment(Pos.CENTER);
Flow senderFlow = new Flow(ProducerController.class);
box.getChildren().add(senderFlow.start());
Flow receiverFlow = new Flow(ReceiverController.class);
box.getChildren().add(receiverFlow.start());
primaryStage.setScene(new Scene(box));
primaryStage.show();
}
public static void main(String... args) {
launch(args);
}
}
代码示例来源:origin: cascading/cascading-hadoop2-common
flow.start();
代码示例来源:origin: cwensel/cascading
flow.start();
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "startException";
Flow process = flowWithException( path, FailingRiffle.Failing.START );
process.addListener( listener );
try
{
process.start();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "startException";
Flow process = flowWithException( path, FailingRiffle.Failing.START );
process.addListener( listener );
try
{
process.start();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
flow.start();
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "stopException";
Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
process.addListener( listener );
process.start();
try
{
process.stop();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: alexholmes/hadoop-book
parsedLogFlow.start();
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "stopException";
Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
process.addListener( listener );
process.start();
try
{
process.stop();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testStartWithoutComplete() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
LockingFlowListener listener = new LockingFlowListener();
flow.addListener( listener );
flow.start();
assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testStartWithoutComplete() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
LockingFlowListener listener = new LockingFlowListener();
flow.addListener( listener );
flow.start();
assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
}
代码示例来源:origin: cwensel/cascading
flow.start();
代码示例来源:origin: cascading/cascading-hadoop2-common
flow.start();
代码示例来源:origin: cascading/cascading-platform
void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
{
getPlatform().copyFromLocal( inputFileIps );
Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.start(); // simple test for start
flow.complete();
validateLength( flow, 17 );
assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testStartStopRace() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "startstop" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
flow.start();
flow.stop(); // should not fail
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testStartStopRace() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "startstop" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
flow.start();
flow.stop(); // should not fail
}
代码示例来源:origin: cwensel/cascading
void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
{
getPlatform().copyFromLocal( inputFileIps );
Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.start(); // simple test for start
flow.complete();
validateLength( flow, 17 );
assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
}
内容来源于网络,如有侵权,请联系作者删除!