本文整理了Java中cascading.tap.Tap.getConfigDef()
方法的一些代码示例,展示了Tap.getConfigDef()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.getConfigDef()
方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:getConfigDef
[英]Returns a cascading.property.ConfigDef instance that allows for local properties to be set and made available via a resulting cascading.flow.FlowProcess instance when the tap is invoked.
Any properties set on the configDef will not show up in any Flow or cascading.flow.FlowStep process level configuration, but will override any of those values as seen by the current Tap instance method call where a FlowProcess is provided except for the #sourceConfInit(cascading.flow.FlowProcess,Object) and #sinkConfInit(cascading.flow.FlowProcess,Object) methods.
That is, the confInit methods are called before any ConfigDef is applied, so any values placed into a ConfigDef instance will not be visible to them.
[中]返回一个级联。所有物ConfigDef实例,允许通过产生的级联设置本地属性并使其可用。流调用tap时的FlowProcess实例。
configDef上设置的任何属性都不会显示在任何流或级联中。流FlowStep进程级配置,但将覆盖当前Tap实例方法调用所看到的任何值,其中提供了FlowProcess,但#sourceConfInit(cascading.flow.FlowProcess,Object)和#sinkConfInit(cascading.flow.FlowProcess,Object)方法除外。
也就是说,在应用任何ConfigDef之前会调用confInit方法,因此放入ConfigDef实例中的任何值对它们都不可见。
代码示例来源:origin: cwensel/cascading
@Override
public ConfigDef getConfigDef()
{
return original.getConfigDef();
}
代码示例来源:origin: cwensel/cascading
@Override
public ConfigDef getConfigDef()
{
return original.getConfigDef();
}
代码示例来源:origin: cwensel/cascading
FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() );
elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) );
break;
代码示例来源:origin: cwensel/cascading
private Tap makeTempTap( FlowElementGraph graph, Pipe pipe, String defaultDecoratorClassName )
{
Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
if( checkpointTap != null )
{
LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null );
}
if( checkpointTap == null )
{
// only restart from a checkpoint pipe or checkpoint tap below
if( pipe instanceof Checkpoint )
{
checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null );
// mark as an anonymous checkpoint
checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
}
else
{
checkpointTap = makeTempTap( pipe.getName() );
}
}
return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS, defaultDecoratorClassName );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTapSinkConfigDef() throws IOException
{
getPlatform().copyFromLocal( inputFileNums20 );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileNums20, SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
Scheme scheme = getPlatform().getTestConfigDefScheme();
Tap sink = getPlatform().getTap( scheme, getOutputPath( "tapsinkconfigdef" ), SinkMode.REPLACE );
// process -> after sink/sourceConfInit are called
// default -> Wrapper for all cluster side calls
sink.getConfigDef().setProperty( Mode.DEFAULT, "default", "sink-default" );
// steps on above value
sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
sink.getConfigDef().setProperty( Mode.DEFAULT, "replace", "sink-default" );
sink.getConfigDef().setProperty( Mode.REPLACE, "replace", "sink-replace" );
sink.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
sink.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
sink.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
assertTrue( flow.resourceExists( sink ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTapSourceConfigDef() throws IOException
{
getPlatform().copyFromLocal( inputFileNums20 );
Scheme scheme = getPlatform().getTestConfigDefScheme();
Tap source = getPlatform().getTap( scheme, inputFileNums20, SinkMode.KEEP );
// process -> after sink/sourceConfInit are called
// default -> Wrapper for all cluster side calls
source.getConfigDef().setProperty( Mode.DEFAULT, "default", "source-default" );
// steps on above value
source.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
source.getConfigDef().setProperty( Mode.DEFAULT, "replace", "source-default" );
source.getConfigDef().setProperty( Mode.REPLACE, "replace", "source-replace" );
source.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
source.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
source.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
source.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
Tap sink = getPlatform().getTextFile( getOutputPath( "tapsourceconfigdef" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
assertTrue( flow.resourceExists( sink ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTapSourceConfigDef() throws IOException
{
getPlatform().copyFromLocal( inputFileNums20 );
Scheme scheme = getPlatform().getTestConfigDefScheme();
Tap source = getPlatform().getTap( scheme, inputFileNums20, SinkMode.KEEP );
// process -> after sink/sourceConfInit are called
// default -> Wrapper for all cluster side calls
source.getConfigDef().setProperty( Mode.DEFAULT, "default", "source-default" );
// steps on above value
source.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
source.getConfigDef().setProperty( Mode.DEFAULT, "replace", "source-default" );
source.getConfigDef().setProperty( Mode.REPLACE, "replace", "source-replace" );
source.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
source.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
source.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
source.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
Tap sink = getPlatform().getTextFile( getOutputPath( "tapsourceconfigdef" ), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
assertTrue( flow.resourceExists( sink ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTapSinkConfigDef() throws IOException
{
getPlatform().copyFromLocal( inputFileNums20 );
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileNums20, SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
Scheme scheme = getPlatform().getTestConfigDefScheme();
Tap sink = getPlatform().getTap( scheme, getOutputPath( "tapsinkconfigdef" ), SinkMode.REPLACE );
// process -> after sink/sourceConfInit are called
// default -> Wrapper for all cluster side calls
sink.getConfigDef().setProperty( Mode.DEFAULT, "default", "sink-default" );
// steps on above value
sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
sink.getConfigDef().setProperty( Mode.DEFAULT, "replace", "sink-default" );
sink.getConfigDef().setProperty( Mode.REPLACE, "replace", "sink-replace" );
sink.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
sink.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
sink.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
assertTrue( flow.resourceExists( sink ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
TrapProps.trapProps()
.recordAllDiagnostics()
.setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0 );
validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
// always fail
pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
TrapProps.trapProps()
.recordAllDiagnostics()
.setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow, 0 );
validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
}
内容来源于网络,如有侵权,请联系作者删除!