cascading.tap.Tap.sourceConfInit()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(127)

本文整理了Java中cascading.tap.Tap.sourceConfInit()方法的一些代码示例,展示了Tap.sourceConfInit()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.sourceConfInit()方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:sourceConfInit

Tap.sourceConfInit介绍

[英]Method sourceConfInit initializes this instance as a source.

This method maybe called more than once if this Tap instance is used outside the scope of a cascading.flow.Flowinstance or if it participates in multiple times in a given Flow or across different Flows in a cascading.cascade.Cascade.

In the context of a Flow, it will be called after cascading.flow.FlowListener#onStarting(cascading.flow.Flow)

Note that no resources or services should be modified by this method.
[中]方法sourceConfInit将此实例初始化为源。
如果此Tap实例在级联范围之外使用,则可能会多次调用此方法。流Flowinstance,或者它多次参与给定流或级联中的不同流。大量大量
在流的上下文中,它将在级联后被调用。流FlowListener#启动(级联.流.流)
请注意,此方法不应修改任何资源或服务。

代码示例

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void sourceConfInit(FlowProcess<Object> flowProcess, Object conf) {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  actualTap.sourceConfInit(flowProcess, conf);
}

代码示例来源:origin: cwensel/cascading

@Override
public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
 {
 original.sourceConfInit( flowProcess, conf );
 }

代码示例来源:origin: cwensel/cascading

@Override
public void sourceConfInit( FlowProcess<? extends TConfig> flowProcess, TConfig conf )
 {
 original.sourceConfInit( processProvider.apply( flowProcess ), configProvider.apply( conf ) );
 }

代码示例来源:origin: cwensel/cascading

@Override
public void sourceConfInit( FlowProcess<? extends Config> process, Config conf )
 {
 for( Tap tap : getTaps() )
  tap.sourceConfInit( process, conf );
 }

代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop

@Override
public void sourceConfInit(FlowProcess<Object> flowProcess, Object conf) {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  actualTap.sourceConfInit(flowProcess, conf);
}

代码示例来源:origin: com.hotels/plunger

private TupleEntryIterator getHadoopTupleEntryIterator() throws IOException {
 @SuppressWarnings("unchecked")
 Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) source;
 JobConf conf = new JobConf();
 FlowProcess<JobConf> flowProcess = new HadoopFlowProcess(conf);
 hadoopTap.sourceConfInit(flowProcess, conf);
 return hadoopTap.openForRead(flowProcess);
}

代码示例来源:origin: com.hotels/plunger

private TupleEntryIterator getLocalTupleEntryIterator() throws IOException {
 @SuppressWarnings("unchecked")
 Tap<Properties, ?, ?> localTap = (Tap<Properties, ?, ?>) source;
 Properties properties = new Properties();
 FlowProcess<Properties> flowProcess = new LocalFlowProcess(properties);
 localTap.sourceConfInit(flowProcess, properties);
 return localTap.openForRead(flowProcess);
}

代码示例来源:origin: cwensel/cascading

protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
 {
 if( !taps.isEmpty() )
  {
  for( Tap tap : taps )
   {
   Properties confCopy = flowProcess.copyConfig( conf );
   tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
   if( isSink )
    tap.sinkConfInit( flowProcess, confCopy );
   else
    tap.sourceConfInit( flowProcess, confCopy );
   }
  }
 }

代码示例来源:origin: cwensel/cascading

protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 super.sourceConfInit( process, conf );
 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
 // use CombineFileInputFormat if that is enabled
 handleCombineFileInputFormat( conf );
 }

代码示例来源:origin: cascading/cascading-jdbc-core

@Override
public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 if( username == null )
  DBConfiguration.configureDB( conf, driverClassName, connectionUrl );
 else
  DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
 super.sourceConfInit( process, conf );
 }

代码示例来源:origin: com.twitter/maple

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
 // a hack for MultiInputFormat to see that there is a child format
 FileInputFormat.setInputPaths( conf, getPath() );
 if(quorumNames != null) {
  conf.set("hbase.zookeeper.quorum", quorumNames);
 }
 LOG.debug("sourcing from table: {}", tableName);
 TableInputFormat.setTableName(conf, tableName);
 super.sourceConfInit(process, conf);
}

代码示例来源:origin: cascading/cascading-hadoop2-io

protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 super.sourceConfInit( process, conf );
 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
 // use CombineFileInputFormat if that is enabled
 handleCombineFileInputFormat( conf );
 }

代码示例来源:origin: cwensel/cascading

throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );
 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );

代码示例来源:origin: cwensel/cascading

tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
tap.sourceConfInit( flowProcess, accumulatedJob );

代码示例来源:origin: cascading/cascading-hadoop2-mr1

tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
tap.sourceConfInit( flowProcess, accumulatedJob );

代码示例来源:origin: dataArtisans/cascading-flink

private DataSet<Tuple> translateSource(FlowProcess flowProcess, ExecutionEnvironment env, FlowNode node, int dop) {
  Tap tap = this.getSingle(node.getSourceTaps());
  JobConf tapConfig = new JobConf(this.getNodeConfig(node));
  tap.sourceConfInit(flowProcess, tapConfig);
  tapConfig.set( "cascading.step.source", Tap.id( tap ) );
  Fields outFields = tap.getSourceFields();
  registerKryoTypes(outFields);
  JobConf sourceConfig = new JobConf(this.getNodeConfig(node));
  MultiInputFormat.addInputFormat(sourceConfig, tapConfig);
  DataSet<Tuple> src = env
      .createInput(new TapInputFormat(node), new TupleTypeInfo(outFields))
          .name(tap.getIdentifier())
          .setParallelism(dop)
          .withParameters(FlinkConfigConverter.toFlinkConfig(new Configuration(sourceConfig)));
  return src;
}

代码示例来源:origin: cascading/cascading-hadoop2-io

private void initialize() throws IOException
 {
 // prevent collisions of configuration properties set client side if now cluster side
 String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
 if( property == null )
  {
  // default behavior is to accumulate paths, so remove any set prior
  conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
  tap.sourceConfInit( flowProcess, conf );
  }
 JobConf jobConf = asJobConfInstance( conf );
 inputFormat = jobConf.getInputFormat();
 if( inputFormat instanceof JobConfigurable )
  ( (JobConfigurable) inputFormat ).configure( jobConf );
 // do not test for existence, let hadoop decide how to handle the given path
 // this delegates globbing to the inputformat on split generation.
 splits = inputFormat.getSplits( jobConf, 1 );
 if( splits.length == 0 )
  complete = true;
 }

代码示例来源:origin: cwensel/cascading

private void initialize() throws IOException
 {
 // prevent collisions of configuration properties set client side if now cluster side
 String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
 if( property == null )
  {
  // default behavior is to accumulate paths, so remove any set prior
  conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
  tap.sourceConfInit( flowProcess, conf );
  }
 JobConf jobConf = asJobConfInstance( conf );
 inputFormat = jobConf.getInputFormat();
 if( inputFormat instanceof JobConfigurable )
  ( (JobConfigurable) inputFormat ).configure( jobConf );
 // do not test for existence, let hadoop decide how to handle the given path
 // this delegates globbing to the inputformat on split generation.
 splits = inputFormat.getSplits( jobConf, 1 );
 if( splits.length == 0 )
  complete = true;
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testCombinedHfs() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 getPlatform().copyFromLocal( inputFileUpper );
 Hfs sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileLower );
 Hfs sourceUpper = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileUpper );
 // create a CombinedHfs instance on these files
 Tap source = new MultiSourceTap<Hfs, JobConf, RecordReader>( sourceLower, sourceUpper );
 FlowProcess<JobConf> process = getPlatform().getFlowProcess();
 JobConf conf = process.getConfigCopy();
 // set the combine flag
 conf.setBoolean( HfsProps.COMBINE_INPUT_FILES, true );
 conf.set( "cascading.flow.platform", "hadoop" ); // only supported on mr based platforms
 // test the input format and the split
 source.sourceConfInit( process, conf );
 InputFormat inputFormat = conf.getInputFormat();
 assertEquals( Hfs.CombinedInputFormat.class, inputFormat.getClass() );
 InputSplit[] splits = inputFormat.getSplits( conf, 1 );
 assertEquals( 1, splits.length );
 validateLength( source.openForRead( process ), 10 );
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

@Test
public void testCombinedHfs() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 getPlatform().copyFromLocal( inputFileUpper );
 Hfs sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileLower );
 Hfs sourceUpper = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileUpper );
 // create a CombinedHfs instance on these files
 Tap source = new MultiSourceTap<Hfs, JobConf, RecordReader>( sourceLower, sourceUpper );
 FlowProcess<JobConf> process = getPlatform().getFlowProcess();
 JobConf conf = process.getConfigCopy();
 // set the combine flag
 conf.setBoolean( HfsProps.COMBINE_INPUT_FILES, true );
 conf.set( "cascading.flow.platform", "hadoop" ); // only supported on mr based platforms
 // test the input format and the split
 source.sourceConfInit( process, conf );
 InputFormat inputFormat = conf.getInputFormat();
 assertEquals( Hfs.CombinedInputFormat.class, inputFormat.getClass() );
 InputSplit[] splits = inputFormat.getSplits( conf, 1 );
 assertEquals( 1, splits.length );
 validateLength( source.openForRead( process ), 10 );
 }

相关文章