
x33g5p2x  于2022-01-30 转载在 其他  



[英]Method sinkConfInit initializes this instance as a sink.

This method maybe called more than once if this Tap instance is used outside the scope of a cascading.flow.Flowinstance or if it participates in multiple times in a given Flow or across different Flows in a cascading.cascade.Cascade.

Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.

In the context of a Flow, it will be called after cascading.flow.FlowListener#onStarting(cascading.flow.Flow)

Note that no resources or services should be modified by this method. If this Tap instance returns true for #isReplace(), then #deleteResource(Object) will be called by the parent Flow.


代码示例来源:origin: elastic/elasticsearch-hadoop

public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
  actualTap.sinkConfInit(flowProcess, conf);

代码示例来源:origin: cwensel/cascading

public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
 original.sinkConfInit( flowProcess, conf );

代码示例来源:origin: cwensel/cascading

public void sinkConfInit( FlowProcess<? extends TConfig> flowProcess, TConfig conf )
 original.sinkConfInit( processProvider.apply( flowProcess ), configProvider.apply( conf ) );

代码示例来源:origin: cwensel/cascading

private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
 if( !traps.isEmpty() )
  JobConf trapConf = HadoopUtil.copyJobConf( conf );
  for( Tap tap : traps.values() )
   tap.sinkConfInit( flowProcess, trapConf );

代码示例来源:origin: cascading/cascading-hadoop2-mr1

private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
 if( !traps.isEmpty() )
  JobConf trapConf = HadoopUtil.copyJobConf( conf );
  for( Tap tap : traps.values() )
   tap.sinkConfInit( flowProcess, trapConf );

代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop

public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
  actualTap.sinkConfInit(flowProcess, conf);

代码示例来源:origin: cwensel/cascading

protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
 Map<String, Tap> traps = flowNode.getTrapMap();
 if( !traps.isEmpty() )
  JobConf trapConf = new JobConf( conf );
  for( Tap tap : traps.values() )
   tap.sinkConfInit( flowProcess, trapConf );
   setLocalMode( conf, trapConf, tap );

代码示例来源:origin: cascading/cascading-hadoop2-tez

protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
 Map<String, Tap> traps = flowNode.getTrapMap();
 if( !traps.isEmpty() )
  JobConf trapConf = new JobConf( conf );
  for( Tap tap : traps.values() )
   tap.sinkConfInit( flowProcess, trapConf );
   setLocalMode( conf, trapConf, tap );

代码示例来源:origin: cwensel/cascading

protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
 if( !taps.isEmpty() )
  for( Tap tap : taps )
   Properties confCopy = flowProcess.copyConfig( conf );
   tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
   if( isSink )
    tap.sinkConfInit( flowProcess, confCopy );
    tap.sourceConfInit( flowProcess, confCopy );

代码示例来源:origin: com.twitter/maple

private void initialize() throws IOException {
 tap.sinkConfInit(hadoopFlowProcess, conf);
 OutputFormat outputFormat = conf.getOutputFormat();"Output format class is: " + outputFormat.getClass().toString());
 writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);

代码示例来源:origin: cwensel/cascading

private void bridge( FlowProcess flowProcess, Object conf )
 childConfigs = new ArrayList<>();
 for( int i = 0; i < getTaps().length; i++ )
  Tap tap = getTaps()[ i ];
  Object newConfig = flowProcess.copyConfig( conf );
  tap.sinkConfInit( flowProcess, newConfig );
  childConfigs.add( flowProcess.diffConfigIntoMap( conf, newConfig ) );

代码示例来源:origin: cascading/cascading-hadoop2-mr1

protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
 // init sink first so tempSink can take precedence
 if( getSink() != null )
  getSink().sinkConfInit( flowProcess, conf );
 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
 boolean isFileOutputFormat = false;
 if( outputFormat != null )
  isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
 Path outputPath = FileOutputFormat.getOutputPath( conf );
 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
 // PartitionTap won't set the output, but will set an OutputFormat
 // MultiSinkTap won't set the output or set the OutputFormat
 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
  tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
 // tempSink exists because sink is writeDirect
 if( tempSink != null )
  tempSink.sinkConfInit( flowProcess, conf );

代码示例来源:origin: cwensel/cascading

protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
 // init sink first so tempSink can take precedence
 if( getSink() != null )
  getSink().sinkConfInit( flowProcess, conf );
 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
 boolean isFileOutputFormat = false;
 if( outputFormat != null )
  isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
 Path outputPath = FileOutputFormat.getOutputPath( conf );
 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
 // PartitionTap won't set the output, but will set an OutputFormat
 // MultiSinkTap won't set the output or set the OutputFormat
 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
  tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
 // tempSink exists because sink is writeDirect
 if( tempSink != null )
  tempSink.sinkConfInit( flowProcess, conf );

代码示例来源:origin: cwensel/cascading

public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 Path qualifiedPath = new Path( getFullIdentifier( conf ) );
 HadoopUtil.setOutputPath( conf, qualifiedPath );
 super.sinkConfInit( process, conf );
 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow

代码示例来源:origin: cascading/cascading-hadoop2-io

public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 Path qualifiedPath = new Path( getFullIdentifier( conf ) );
 HadoopUtil.setOutputPath( conf, qualifiedPath );
 super.sinkConfInit( process, conf );
 makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
 TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow

代码示例来源:origin: cascading/cascading-jdbc-core

public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 if( !isSink() )
 if( username == null )
  DBConfiguration.configureDB( conf, driverClassName, connectionUrl );
  DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
 super.sinkConfInit( process, conf );

代码示例来源:origin: cwensel/cascading

protected void initialize() throws IOException
 tap.sinkConfInit( flowProcess, conf );
 OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
 // todo: use OutputCommitter class
 isFileOutputFormat = outputFormat instanceof FileOutputFormat;
 if( isFileOutputFormat )
  Hadoop18TapUtil.setupJob( conf );
  Hadoop18TapUtil.setupTask( conf );
  int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
  long localSequence = sequence == -1 ? 0 : sequence;
  if( prefix != null )
   filename = String.format( filenamePattern, prefix, "/", partition, localSequence );
   filename = String.format( filenamePattern, "", "", partition, localSequence );
  } "creating path: {}", filename );
 writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );

代码示例来源:origin: cascading/cascading-hadoop2-io

protected void initialize() throws IOException
 tap.sinkConfInit( flowProcess, conf );
 OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
 // todo: use OutputCommitter class
 isFileOutputFormat = outputFormat instanceof FileOutputFormat;
 if( isFileOutputFormat )
  Hadoop18TapUtil.setupJob( conf );
  Hadoop18TapUtil.setupTask( conf );
  int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
  long localSequence = sequence == -1 ? 0 : sequence;
  if( prefix != null )
   filename = String.format( filenamePattern, prefix, "/", partition, localSequence );
   filename = String.format( filenamePattern, "", "", partition, localSequence );
  } "creating path: {}", filename );
 writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );


private void writeToHadoopTap(Tap<?, ?, ?> tap) throws IOException {
 Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) tap;
 JobConf conf = new JobConf();
 HadoopFlowProcess flowProcess = new HadoopFlowProcess(conf);
 hadoopTap.sinkConfInit(flowProcess, conf);
 TupleEntryCollector collector = hadoopTap.openForWrite(flowProcess);
 for (TupleEntry tuple : data.asTupleEntryList()) {


private void writeToLocalTap(Tap<?, ?, ?> tap) throws IOException {
 Tap<Properties, ?, ?> localTap = (Tap<Properties, ?, ?>) tap;
 Properties conf = new Properties();
 LocalFlowProcess flowProcess = new LocalFlowProcess(conf);
 flowProcess.setStepStats(new LocalStepStats(new NullFlowStep(), NullClientState.INSTANCE));
 localTap.sinkConfInit(flowProcess, conf);
 TupleEntryCollector collector = localTap.openForWrite(flowProcess);
 for (TupleEntry tuple : data.asTupleEntryList()) {
