cascading.tap.Tap.getConfigDef()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中cascading.tap.Tap.getConfigDef()方法的一些代码示例,展示了Tap.getConfigDef()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.getConfigDef()方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:getConfigDef

Tap.getConfigDef介绍

[英]Returns a cascading.property.ConfigDef instance that allows for local properties to be set and made available via a resulting cascading.flow.FlowProcess instance when the tap is invoked.

Any properties set on the configDef will not show up in any Flow or cascading.flow.FlowStep process level configuration, but will override any of those values as seen by the current Tap instance method call where a FlowProcess is provided except for the #sourceConfInit(cascading.flow.FlowProcess,Object) and #sinkConfInit(cascading.flow.FlowProcess,Object) methods.

That is, the confInit methods are called before any ConfigDef is applied, so any values placed into a ConfigDef instance will not be visible to them.
[中]返回一个级联。所有物ConfigDef实例,允许通过产生的级联设置本地属性并使其可用。流调用tap时的FlowProcess实例。
configDef上设置的任何属性都不会显示在任何流或级联中。流FlowStep进程级配置,但将覆盖当前Tap实例方法调用所看到的任何值,其中提供了FlowProcess,但#sourceConfInit(cascading.flow.FlowProcess,Object)和#sinkConfInit(cascading.flow.FlowProcess,Object)方法除外。
也就是说,在应用任何ConfigDef之前会调用
confInit方法,因此放入ConfigDef实例中的任何值对它们都不可见。

代码示例

代码示例来源:origin: cwensel/cascading

@Override
public ConfigDef getConfigDef()
 {
 return original.getConfigDef();
 }

代码示例来源:origin: cwensel/cascading

@Override
public ConfigDef getConfigDef()
 {
 return original.getConfigDef();
 }

代码示例来源:origin: cwensel/cascading

FlowProcess elementFlowProcess = new ElementFlowProcess( flowProcess, trap.getConfigDef() );
elementDuct.setTrapHandler( new TrapHandler( elementFlowProcess, flowElement, trap, branchName ) );
break;

代码示例来源:origin: cwensel/cascading

private Tap makeTempTap( FlowElementGraph graph, Pipe pipe, String defaultDecoratorClassName )
 {
 Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
 if( checkpointTap != null )
  {
  LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
  checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null );
  }
 if( checkpointTap == null )
  {
  // only restart from a checkpoint pipe or checkpoint tap below
  if( pipe instanceof Checkpoint )
   {
   checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
   checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS, null );
   // mark as an anonymous checkpoint
   checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
   }
  else
   {
   checkpointTap = makeTempTap( pipe.getName() );
   }
  }
 return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS, defaultDecoratorClassName );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTapSinkConfigDef() throws IOException
 {
 getPlatform().copyFromLocal( inputFileNums20 );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileNums20, SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
 Scheme scheme = getPlatform().getTestConfigDefScheme();
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "tapsinkconfigdef" ), SinkMode.REPLACE );
 // process -> after sink/sourceConfInit are called
 // default -> Wrapper for all cluster side calls
 sink.getConfigDef().setProperty( Mode.DEFAULT, "default", "sink-default" );
 // steps on above value
 sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
 sink.getConfigDef().setProperty( Mode.DEFAULT, "replace", "sink-default" );
 sink.getConfigDef().setProperty( Mode.REPLACE, "replace", "sink-replace" );
 sink.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
 sink.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
 sink.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
 sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 assertTrue( flow.resourceExists( sink ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTapSourceConfigDef() throws IOException
 {
 getPlatform().copyFromLocal( inputFileNums20 );
 Scheme scheme = getPlatform().getTestConfigDefScheme();
 Tap source = getPlatform().getTap( scheme, inputFileNums20, SinkMode.KEEP );
 // process -> after sink/sourceConfInit are called
 // default -> Wrapper for all cluster side calls
 source.getConfigDef().setProperty( Mode.DEFAULT, "default", "source-default" );
 // steps on above value
 source.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
 source.getConfigDef().setProperty( Mode.DEFAULT, "replace", "source-default" );
 source.getConfigDef().setProperty( Mode.REPLACE, "replace", "source-replace" );
 source.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
 source.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
 source.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
 source.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( getOutputPath( "tapsourceconfigdef" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 assertTrue( flow.resourceExists( sink ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTapSourceConfigDef() throws IOException
 {
 getPlatform().copyFromLocal( inputFileNums20 );
 Scheme scheme = getPlatform().getTestConfigDefScheme();
 Tap source = getPlatform().getTap( scheme, inputFileNums20, SinkMode.KEEP );
 // process -> after sink/sourceConfInit are called
 // default -> Wrapper for all cluster side calls
 source.getConfigDef().setProperty( Mode.DEFAULT, "default", "source-default" );
 // steps on above value
 source.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
 source.getConfigDef().setProperty( Mode.DEFAULT, "replace", "source-default" );
 source.getConfigDef().setProperty( Mode.REPLACE, "replace", "source-replace" );
 source.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
 source.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
 source.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
 source.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
 Tap sink = getPlatform().getTextFile( getOutputPath( "tapsourceconfigdef" ), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 assertTrue( flow.resourceExists( sink ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTapSinkConfigDef() throws IOException
 {
 getPlatform().copyFromLocal( inputFileNums20 );
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileNums20, SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Insert( new Fields( "value" ), "nada" ), Fields.ALL );
 Scheme scheme = getPlatform().getTestConfigDefScheme();
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "tapsinkconfigdef" ), SinkMode.REPLACE );
 // process -> after sink/sourceConfInit are called
 // default -> Wrapper for all cluster side calls
 sink.getConfigDef().setProperty( Mode.DEFAULT, "default", "sink-default" );
 // steps on above value
 sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default", "process-default" );
 sink.getConfigDef().setProperty( Mode.DEFAULT, "replace", "sink-default" );
 sink.getConfigDef().setProperty( Mode.REPLACE, "replace", "sink-replace" );
 sink.getNodeConfigDef().setProperty( Mode.REPLACE, "default-node", "node-replace" );
 sink.getStepConfigDef().setProperty( Mode.DEFAULT, "replace", "process-default" );
 sink.getStepConfigDef().setProperty( Mode.REPLACE, "replace", "process-replace" );
 sink.getStepConfigDef().setProperty( Mode.DEFAULT, "default-node", "process-default" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 assertTrue( flow.resourceExists( sink ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 TrapProps.trapProps()
  .recordAllDiagnostics()
  .setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0 );
 validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testTrapDiagnosticsLocalConfig() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 // always fail
 pipe = new Each( pipe, new Fields( "ip" ), new TestFunction( new Fields( "test" ), null ), Fields.ALL );
 pipe = new GroupBy( "reduce", pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTextFile( getOutputPath( "diagconfigdef/tap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "diagconfigdef/trap" + NONDETERMINISTIC ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 TrapProps.trapProps()
  .recordAllDiagnostics()
  .setProperties( trap.getConfigDef(), ConfigDef.Mode.DEFAULT );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow, 0 );
 validateLength( flow.openTrap(), 10, 4, Pattern.compile( ".*TrapPlatformTest.*" ) ); // 4 columns, not 1
 }

相关文章