cascading.tap.Tap.id()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(99)

本文整理了Java中cascading.tap.Tap.id()方法的一些代码示例,展示了Tap.id()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.id()方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:id

Tap.id介绍

[英]Field id
[中]字段id

代码示例

代码示例来源:origin: cwensel/cascading

public static Tap getTapForID( Set<Tap> taps, String id )
 {
 for( Tap tap : taps )
  {
  if( Tap.id( tap ).equals( id ) )
   return tap;
  }
 return null;
 }

代码示例来源:origin: cascading/cascading-hadoop2-tez

@Override
protected void addLocalCacheFiles( Configuration conf, URI uri )
 {
 String key = CASCADING_LOCAL_RESOURCES + Tap.id( this );
 Collection<String> resources = conf.getStringCollection( key );
 if( resources == null )
  resources = new ArrayList<>();
 resources.add( uri.toString() );
 conf.setStrings( key, resources.toArray( new String[ resources.size() ] ) );
 }
}

代码示例来源:origin: cwensel/cascading

@Override
protected void addLocalCacheFiles( Configuration conf, URI uri )
 {
 String key = CASCADING_LOCAL_RESOURCES + Tap.id( this );
 Collection<String> resources = conf.getStringCollection( key );
 if( resources == null )
  resources = new ArrayList<>();
 resources.add( uri.toString() );
 conf.setStrings( key, resources.toArray( new String[ resources.size() ] ) );
 }
}

代码示例来源:origin: cwensel/cascading

/**
 * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
 * the Tap instance in properties files etc.
 * <p>
 * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
 *
 * @param tap of type Tap
 * @return of type String
 */
public static synchronized String id( Tap tap )
 {
 if( tap instanceof DecoratorTap )
  return id( ( (DecoratorTap) tap ).getOriginal() );
 return tap.id;
 }

代码示例来源:origin: cwensel/cascading

public static String id( FlowElement flowElement )
 {
 if( flowElement instanceof Pipe )
  return Pipe.id( (Pipe) flowElement );
 if( flowElement instanceof Tap )
  return Tap.id( (Tap) flowElement );
 String id = Util.returnInstanceFieldIfExistsSafe( flowElement, "id" );
 if( id != null )
  return id;
 throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() );
 }

代码示例来源:origin: cascading/cascading-hadoop2-tez

@Override
protected SinkStage createSinkStage( Tap sink )
 {
 String id = Tap.id( sink );
 LogicalOutput logicalOutput = outputMap.get( id );
 if( logicalOutput == null )
  logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
 if( logicalOutput == null )
  throw new IllegalStateException( "could not find output for: " + sink );
 return new TezSinkStage( flowProcess, sink, logicalOutput );
 }

代码示例来源:origin: cwensel/cascading

@Override
protected Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException
 {
 String key = CASCADING_REMOTE_RESOURCES + Tap.id( this );
 String property = flowProcess.getStringProperty( key );
 if( property == null )
  throw new TapException( "unable to find local resources property for: " + key );
 String[] split = property.split( "," );
 Path[] paths = new Path[ split.length ];
 for( int i = 0; i < split.length; i++ )
  paths[ i ] = new Path( split[ i ] );
 return paths;
 }

代码示例来源:origin: cwensel/cascading

@Override
protected SinkStage createSinkStage( Tap sink )
 {
 String id = Tap.id( sink );
 LogicalOutput logicalOutput = outputMap.get( id );
 if( logicalOutput == null )
  logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
 if( logicalOutput == null )
  throw new IllegalStateException( "could not find output for: " + sink );
 return new TezSinkStage( flowProcess, sink, logicalOutput );
 }

代码示例来源:origin: cascading/cascading-hadoop2-tez

@Override
protected Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException
 {
 String key = CASCADING_REMOTE_RESOURCES + Tap.id( this );
 String property = flowProcess.getStringProperty( key );
 if( property == null )
  throw new TapException( "unable to find local resources property for: " + key );
 String[] split = property.split( "," );
 Path[] paths = new Path[ split.length ];
 for( int i = 0; i < split.length; i++ )
  paths[ i ] = new Path( split[ i ] );
 return paths;
 }

代码示例来源:origin: cwensel/cascading

protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
 {
 String id = Tap.id( source );
 LogicalInput logicalInput = inputMap.get( id );
 if( logicalInput == null )
  logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
 if( logicalInput == null )
  return new SourceStage( flowProcess, source );
 return new TezSourceStage( flowProcess, source, logicalInput );
 }

代码示例来源:origin: cascading/cascading-hadoop2-tez

protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
 {
 String id = Tap.id( source );
 LogicalInput logicalInput = inputMap.get( id );
 if( logicalInput == null )
  logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
 if( logicalInput == null )
  return new SourceStage( flowProcess, source );
 return new TezSourceStage( flowProcess, source, logicalInput );
 }

代码示例来源:origin: cwensel/cascading

@Override
public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 if( HadoopUtil.isLocal( conf ) ||
  Tap.id( this ).equals( conf.get( "cascading.node.source" ) ) ||
  Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
  {
  LOG.info( "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
  super.sourceConfInit( process, conf );
  return;
  }
 try
  {
  registerHfs( process, conf, getHfs() );
  }
 catch( IOException exception )
  {
  throw new TapException( exception );
  }
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

@Override
public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 {
 if( HadoopUtil.isLocal( conf ) ||
  Tap.id( this ).equals( conf.get( "cascading.node.source" ) ) ||
  Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
  {
  LOG.info( "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
  super.sourceConfInit( process, conf );
  return;
  }
 try
  {
  registerHfs( process, conf, getHfs() );
  }
 catch( IOException exception )
  {
  throw new TapException( exception );
  }
 }

代码示例来源:origin: cwensel/cascading

protected void buildGraph()
 {
 streamedHead = handleHead( this.source, flowProcess );
 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
 tributaries.remove( this.source ); // we cannot stream and accumulate the same source
 // accumulated paths
 for( Object source : tributaries )
  {
  final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
  JobConf conf = hadoopProcess.getJobConf();
  // allows client side config to be used cluster side
  String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
  if( property == null )
   throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
  conf = getSourceConf( hadoopProcess, conf, property );
  // the reporter isn't provided until after the #run method is called
  flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
   {
   @Override
   public Reporter getReporter()
    {
    return hadoopProcess.getReporter();
    }
   };
  handleHead( (Tap) source, flowProcess );
  }
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

protected void buildGraph()
 {
 streamedHead = handleHead( this.source, flowProcess );
 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
 tributaries.remove( this.source ); // we cannot stream and accumulate the same source
 // accumulated paths
 for( Object source : tributaries )
  {
  final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
  JobConf conf = hadoopProcess.getJobConf();
  // allows client side config to be used cluster side
  String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
  if( property == null )
   throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
  conf = getSourceConf( hadoopProcess, conf, property );
  // the reporter isn't provided until after the #run method is called
  flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
   {
   @Override
   public Reporter getReporter()
    {
    return hadoopProcess.getReporter();
    }
   };
  handleHead( (Tap) source, flowProcess );
  }
 }

代码示例来源:origin: cwensel/cascading

streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );

代码示例来源:origin: cascading/cascading-hadoop2-mr1

streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );

代码示例来源:origin: dataArtisans/cascading-flink

private DataSet<Tuple> translateSource(FlowProcess flowProcess, ExecutionEnvironment env, FlowNode node, int dop) {
  Tap tap = this.getSingle(node.getSourceTaps());
  JobConf tapConfig = new JobConf(this.getNodeConfig(node));
  tap.sourceConfInit(flowProcess, tapConfig);
  tapConfig.set( "cascading.step.source", Tap.id( tap ) );
  Fields outFields = tap.getSourceFields();
  registerKryoTypes(outFields);
  JobConf sourceConfig = new JobConf(this.getNodeConfig(node));
  MultiInputFormat.addInputFormat(sourceConfig, tapConfig);
  DataSet<Tuple> src = env
      .createInput(new TapInputFormat(node), new TupleTypeInfo(outFields))
          .name(tap.getIdentifier())
          .setParallelism(dop)
          .withParameters(FlinkConfigConverter.toFlinkConfig(new Configuration(sourceConfig)));
  return src;
}

代码示例来源:origin: cwensel/cascading

private void initialize() throws IOException
 {
 // prevent collisions of configuration properties set client side if now cluster side
 String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
 if( property == null )
  {
  // default behavior is to accumulate paths, so remove any set prior
  conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
  tap.sourceConfInit( flowProcess, conf );
  }
 JobConf jobConf = asJobConfInstance( conf );
 inputFormat = jobConf.getInputFormat();
 if( inputFormat instanceof JobConfigurable )
  ( (JobConfigurable) inputFormat ).configure( jobConf );
 // do not test for existence, let hadoop decide how to handle the given path
 // this delegates globbing to the inputformat on split generation.
 splits = inputFormat.getSplits( jobConf, 1 );
 if( splits.length == 0 )
  complete = true;
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

private void initialize() throws IOException
 {
 // prevent collisions of configuration properties set client side if now cluster side
 String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
 if( property == null )
  {
  // default behavior is to accumulate paths, so remove any set prior
  conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
  tap.sourceConfInit( flowProcess, conf );
  }
 JobConf jobConf = asJobConfInstance( conf );
 inputFormat = jobConf.getInputFormat();
 if( inputFormat instanceof JobConfigurable )
  ( (JobConfigurable) inputFormat ).configure( jobConf );
 // do not test for existence, let hadoop decide how to handle the given path
 // this delegates globbing to the inputformat on split generation.
 splits = inputFormat.getSplits( jobConf, 1 );
 if( splits.length == 0 )
  complete = true;
 }

相关文章