cascading.tap.Tap.sinkConfInit()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中cascading.tap.Tap.sinkConfInit()方法的一些代码示例,展示了Tap.sinkConfInit()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.sinkConfInit()方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:sinkConfInit

Tap.sinkConfInit介绍

[英]Method sinkConfInit initializes this instance as a sink.

This method maybe called more than once if this Tap instance is used outside the scope of a cascading.flow.Flowinstance or if it participates in multiple times in a given Flow or across different Flows in a cascading.cascade.Cascade.

Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.

In the context of a Flow, it will be called after cascading.flow.FlowListener#onStarting(cascading.flow.Flow)

Note that no resources or services should be modified by this method. If this Tap instance returns true for #isReplace(), then #deleteResource(Object) will be called by the parent Flow.
[中]方法sinkConfInit将此实例初始化为接收器。
如果此Tap实例在级联范围之外使用,则可能会多次调用此方法。流Flowinstance,或者它多次参与给定流或级联中的不同流。大量大量
注:此方法将在该抽头用作传统“接收器”和“陷阱”的情况下调用。
在流的上下文中,它将在级联后被调用。流FlowListener#启动(级联.流.流)
请注意,此方法不应修改任何资源或服务。如果此Tap实例为#isReplace()返回true,则父流将调用#deleteResource(对象)。

代码示例

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  actualTap.sinkConfInit(flowProcess, conf);
}

代码示例来源:origin: cwensel/cascading

@Override
public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
 {
 original.sinkConfInit( flowProcess, conf );
 }

代码示例来源:origin: cwensel/cascading

@Override
public void sinkConfInit( FlowProcess<? extends TConfig> flowProcess, TConfig conf )
 {
 original.sinkConfInit( processProvider.apply( flowProcess ), configProvider.apply( conf ) );
 }

代码示例来源:origin: cwensel/cascading

private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
 {
 if( !traps.isEmpty() )
  {
  JobConf trapConf = HadoopUtil.copyJobConf( conf );
  for( Tap tap : traps.values() )
   tap.sinkConfInit( flowProcess, trapConf );
  }
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
 {
 if( !traps.isEmpty() )
  {
  JobConf trapConf = HadoopUtil.copyJobConf( conf );
  for( Tap tap : traps.values() )
   tap.sinkConfInit( flowProcess, trapConf );
  }
 }

代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop

@Override
public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  actualTap.sinkConfInit(flowProcess, conf);
}

代码示例来源:origin: cwensel/cascading

protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
 {
 Map<String, Tap> traps = flowNode.getTrapMap();
 if( !traps.isEmpty() )
  {
  JobConf trapConf = new JobConf( conf );
  for( Tap tap : traps.values() )
   {
   tap.sinkConfInit( flowProcess, trapConf );
   setLocalMode( conf, trapConf, tap );
   }
  }
 }

代码示例来源:origin: cascading/cascading-hadoop2-tez

protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
 {
 Map<String, Tap> traps = flowNode.getTrapMap();
 if( !traps.isEmpty() )
  {
  JobConf trapConf = new JobConf( conf );
  for( Tap tap : traps.values() )
   {
   tap.sinkConfInit( flowProcess, trapConf );
   setLocalMode( conf, trapConf, tap );
   }
  }
 }

代码示例来源:origin: cwensel/cascading

protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
 {
 if( !taps.isEmpty() )
  {
  for( Tap tap : taps )
   {
   Properties confCopy = flowProcess.copyConfig( conf );
   tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
   if( isSink )
    tap.sinkConfInit( flowProcess, confCopy );
   else
    tap.sourceConfInit( flowProcess, confCopy );
   }
  }
 }

代码示例来源:origin: com.twitter/maple

private void initialize() throws IOException {
 tap.sinkConfInit(hadoopFlowProcess, conf);
 OutputFormat outputFormat = conf.getOutputFormat();
 LOG.info("Output format class is: " + outputFormat.getClass().toString());
 writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);
 sinkCall.setOutput(this);
}

代码示例来源:origin: cwensel/cascading

private void bridge( FlowProcess flowProcess, Object conf )
 {
 childConfigs = new ArrayList<>();
 for( int i = 0; i < getTaps().length; i++ )
  {
  Tap tap = getTaps()[ i ];
  Object newConfig = flowProcess.copyConfig( conf );
  tap.sinkConfInit( flowProcess, newConfig );
  childConfigs.add( flowProcess.diffConfigIntoMap( conf, newConfig ) );
  }
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
 {
 // init sink first so tempSink can take precedence
 if( getSink() != null )
  getSink().sinkConfInit( flowProcess, conf );
 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
 boolean isFileOutputFormat = false;
 if( outputFormat != null )
  isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
 Path outputPath = FileOutputFormat.getOutputPath( conf );
 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
 // PartitionTap won't set the output, but will set an OutputFormat
 // MultiSinkTap won't set the output or set the OutputFormat
 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
  tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
 // tempSink exists because sink is writeDirect
 if( tempSink != null )
  tempSink.sinkConfInit( flowProcess, conf );
 }

代码示例来源:origin: cwensel/cascading

protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
 {
 // init sink first so tempSink can take precedence
 if( getSink() != null )
  getSink().sinkConfInit( flowProcess, conf );
 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
 boolean isFileOutputFormat = false;
 if( outputFormat != null )
  isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
 Path outputPath = FileOutputFormat.getOutputPath( conf );
 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
 // PartitionTap won't set the output, but will set an OutputFormat
 // MultiSinkTap won't set the output or set the OutputFormat
 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
  tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
 // tempSink exists because sink is writeDirect
 if( tempSink != null )
  tempSink.sinkConfInit( flowProcess, conf );
 }

代码示例来源:origin: cwensel/cascading

@Override
public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 Path qualifiedPath = new Path( getFullIdentifier( conf ) );
 HadoopUtil.setOutputPath( conf, qualifiedPath );
 super.sinkConfInit( process, conf );
 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

@Override
public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 Path qualifiedPath = new Path( getFullIdentifier( conf ) );
 HadoopUtil.setOutputPath( conf, qualifiedPath );
 super.sinkConfInit( process, conf );
 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
 }

代码示例来源:origin: cascading/cascading-jdbc-core

@Override
public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 if( !isSink() )
  return;
 if( username == null )
  DBConfiguration.configureDB( conf, driverClassName, connectionUrl );
 else
  DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
 super.sinkConfInit( process, conf );
 }

代码示例来源:origin: cwensel/cascading

protected void initialize() throws IOException
 {
 tap.sinkConfInit( flowProcess, conf );
 OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
 // todo: use OutputCommitter class
 isFileOutputFormat = outputFormat instanceof FileOutputFormat;
 if( isFileOutputFormat )
  {
  Hadoop18TapUtil.setupJob( conf );
  Hadoop18TapUtil.setupTask( conf );
  int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
  long localSequence = sequence == -1 ? 0 : sequence;
  if( prefix != null )
   filename = String.format( filenamePattern, prefix, "/", partition, localSequence );
  else
   filename = String.format( filenamePattern, "", "", partition, localSequence );
  }
 LOG.info( "creating path: {}", filename );
 writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

protected void initialize() throws IOException
 {
 tap.sinkConfInit( flowProcess, conf );
 OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
 // todo: use OutputCommitter class
 isFileOutputFormat = outputFormat instanceof FileOutputFormat;
 if( isFileOutputFormat )
  {
  Hadoop18TapUtil.setupJob( conf );
  Hadoop18TapUtil.setupTask( conf );
  int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
  long localSequence = sequence == -1 ? 0 : sequence;
  if( prefix != null )
   filename = String.format( filenamePattern, prefix, "/", partition, localSequence );
  else
   filename = String.format( filenamePattern, "", "", partition, localSequence );
  }
 LOG.info( "creating path: {}", filename );
 writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );
 }

代码示例来源:origin: com.hotels/plunger

private void writeToHadoopTap(Tap<?, ?, ?> tap) throws IOException {
 @SuppressWarnings("unchecked")
 Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) tap;
 JobConf conf = new JobConf();
 HadoopFlowProcess flowProcess = new HadoopFlowProcess(conf);
 hadoopTap.sinkConfInit(flowProcess, conf);
 TupleEntryCollector collector = hadoopTap.openForWrite(flowProcess);
 for (TupleEntry tuple : data.asTupleEntryList()) {
  collector.add(tuple);
 }
 collector.close();
}

代码示例来源:origin: com.hotels/plunger

private void writeToLocalTap(Tap<?, ?, ?> tap) throws IOException {
 @SuppressWarnings("unchecked")
 Tap<Properties, ?, ?> localTap = (Tap<Properties, ?, ?>) tap;
 Properties conf = new Properties();
 LocalFlowProcess flowProcess = new LocalFlowProcess(conf);
 flowProcess.setStepStats(new LocalStepStats(new NullFlowStep(), NullClientState.INSTANCE));
 localTap.sinkConfInit(flowProcess, conf);
 TupleEntryCollector collector = localTap.openForWrite(flowProcess);
 for (TupleEntry tuple : data.asTupleEntryList()) {
  collector.add(tuple);
 }
 collector.close();
 localTap.commitResource(conf);
}

相关文章