public static Tap getTapForID( Set<Tap> taps, String id )
 for( Tap tap : taps )
  if( tap ).equals( id ) )
   return tap;
 return null;

protected void addLocalCacheFiles( Configuration conf, URI uri )
 String key = CASCADING_LOCAL_RESOURCES + this );
 Collection<String> resources = conf.getStringCollection( key );
 if( resources == null )
  resources = new ArrayList<>();
 resources.add( uri.toString() );
 conf.setStrings( key, resources.toArray( new String[ resources.size() ] ) );

protected void addLocalCacheFiles( Configuration conf, URI uri )
 String key = CASCADING_LOCAL_RESOURCES + this );
 Collection<String> resources = conf.getStringCollection( key );
 if( resources == null )
  resources = new ArrayList<>();
 resources.add( uri.toString() );
 conf.setStrings( key, resources.toArray( new String[ resources.size() ] ) );

 * Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
 * the Tap instance in properties files etc.
 * <p>
 * This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
 * @param tap of type Tap
 * @return of type String
public static synchronized String id( Tap tap )
 if( tap instanceof DecoratorTap )
  return id( ( (DecoratorTap) tap ).getOriginal() );

public static String id( FlowElement flowElement )
 if( flowElement instanceof Pipe )
  return (Pipe) flowElement );
 if( flowElement instanceof Tap )
  return (Tap) flowElement );
 String id = Util.returnInstanceFieldIfExistsSafe( flowElement, "id" );
 if( id != null )
  return id;
 throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() );

protected SinkStage createSinkStage( Tap sink )
 String id = sink );
 LogicalOutput logicalOutput = outputMap.get( id );
 if( logicalOutput == null )
  logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
 if( logicalOutput == null )
  throw new IllegalStateException( "could not find output for: " + sink );
 return new TezSinkStage( flowProcess, sink, logicalOutput );

protected Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException
 String key = CASCADING_REMOTE_RESOURCES + this );
 String property = flowProcess.getStringProperty( key );
 if( property == null )
  throw new TapException( "unable to find local resources property for: " + key );
 String[] split = property.split( "," );
 Path[] paths = new Path[ split.length ];
 for( int i = 0; i < split.length; i++ )
  paths[ i ] = new Path( split[ i ] );
 return paths;

protected SinkStage createSinkStage( Tap sink )
 String id = sink );
 LogicalOutput logicalOutput = outputMap.get( id );
 if( logicalOutput == null )
  logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
 if( logicalOutput == null )
  throw new IllegalStateException( "could not find output for: " + sink );
 return new TezSinkStage( flowProcess, sink, logicalOutput );

protected Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException
 String key = CASCADING_REMOTE_RESOURCES + this );
 String property = flowProcess.getStringProperty( key );
 if( property == null )
  throw new TapException( "unable to find local resources property for: " + key );
 String[] split = property.split( "," );
 Path[] paths = new Path[ split.length ];
 for( int i = 0; i < split.length; i++ )
  paths[ i ] = new Path( split[ i ] );
 return paths;

protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
 String id = source );
 LogicalInput logicalInput = inputMap.get( id );
 if( logicalInput == null )
  logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
 if( logicalInput == null )
  return new SourceStage( flowProcess, source );
 return new TezSourceStage( flowProcess, source, logicalInput );

protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
 String id = source );
 LogicalInput logicalInput = inputMap.get( id );
 if( logicalInput == null )
  logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
 if( logicalInput == null )
  return new SourceStage( flowProcess, source );
 return new TezSourceStage( flowProcess, source, logicalInput );

public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 if( HadoopUtil.isLocal( conf ) || this ).equals( conf.get( "cascading.node.source" ) ) || this ).equals( conf.get( "cascading.step.source" ) ) )
  { "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
  super.sourceConfInit( process, conf );
  registerHfs( process, conf, getHfs() );
 catch( IOException exception )
  throw new TapException( exception );

public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
 if( HadoopUtil.isLocal( conf ) || this ).equals( conf.get( "cascading.node.source" ) ) || this ).equals( conf.get( "cascading.step.source" ) ) )
  { "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
  super.sourceConfInit( process, conf );
  registerHfs( process, conf, getHfs() );
 catch( IOException exception )
  throw new TapException( exception );

protected void buildGraph()
 streamedHead = handleHead( this.source, flowProcess );
 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
 tributaries.remove( this.source ); // we cannot stream and accumulate the same source
 // accumulated paths
 for( Object source : tributaries )
  final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
  JobConf conf = hadoopProcess.getJobConf();
  // allows client side config to be used cluster side
  String property = conf.getRaw( "cascading.node.accumulated.source.conf." + (Tap) source ) );
  if( property == null )
   throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
  conf = getSourceConf( hadoopProcess, conf, property );
  // the reporter isn't provided until after the #run method is called
  flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
   public Reporter getReporter()
    return hadoopProcess.getReporter();
  handleHead( (Tap) source, flowProcess );

protected void buildGraph()
 streamedHead = handleHead( this.source, flowProcess );
 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
 tributaries.remove( this.source ); // we cannot stream and accumulate the same source
 // accumulated paths
 for( Object source : tributaries )
  final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
  JobConf conf = hadoopProcess.getJobConf();
  // allows client side config to be used cluster side
  String property = conf.getRaw( "cascading.node.accumulated.source.conf." + (Tap) source ) );
  if( property == null )
   throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
  conf = getSourceConf( hadoopProcess, conf, property );
  // the reporter isn't provided until after the #run method is called
  flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
   public Reporter getReporter()
    return hadoopProcess.getReporter();
  handleHead( (Tap) source, flowProcess );

streamedJobs[ i ].set( "cascading.step.source", tap ) );
conf.set( "cascading.node.accumulated.source.conf." + tap ), pack( map, conf ) );

streamedJobs[ i ].set( "cascading.step.source", tap ) );
conf.set( "cascading.node.accumulated.source.conf." + tap ), pack( map, conf ) );

private DataSet<Tuple> translateSource(FlowProcess flowProcess, ExecutionEnvironment env, FlowNode node, int dop) {
  Tap tap = this.getSingle(node.getSourceTaps());
  JobConf tapConfig = new JobConf(this.getNodeConfig(node));
  tap.sourceConfInit(flowProcess, tapConfig);
  tapConfig.set( "cascading.step.source", tap ) );
  Fields outFields = tap.getSourceFields();
  JobConf sourceConfig = new JobConf(this.getNodeConfig(node));
  MultiInputFormat.addInputFormat(sourceConfig, tapConfig);
  DataSet<Tuple> src = env
      .createInput(new TapInputFormat(node), new TupleTypeInfo(outFields))
          .withParameters(FlinkConfigConverter.toFlinkConfig(new Configuration(sourceConfig)));
  return src;

private void initialize() throws IOException
 // prevent collisions of configuration properties set client side if now cluster side
 String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + tap ) );
 if( property == null )
  // default behavior is to accumulate paths, so remove any set prior
  conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
  tap.sourceConfInit( flowProcess, conf );
 JobConf jobConf = asJobConfInstance( conf );
 inputFormat = jobConf.getInputFormat();
 if( inputFormat instanceof JobConfigurable )
  ( (JobConfigurable) inputFormat ).configure( jobConf );
 // do not test for existence, let hadoop decide how to handle the given path
 // this delegates globbing to the inputformat on split generation.
 splits = inputFormat.getSplits( jobConf, 1 );
 if( splits.length == 0 )
  complete = true;

private void initialize() throws IOException
 // prevent collisions of configuration properties set client side if now cluster side
 String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + tap ) );
 if( property == null )
  // default behavior is to accumulate paths, so remove any set prior
  conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
  tap.sourceConfInit( flowProcess, conf );
 JobConf jobConf = asJobConfInstance( conf );
 inputFormat = jobConf.getInputFormat();
 if( inputFormat instanceof JobConfigurable )
  ( (JobConfigurable) inputFormat ).configure( jobConf );
 // do not test for existence, let hadoop decide how to handle the given path
 // this delegates globbing to the inputformat on split generation.
 splits = inputFormat.getSplits( jobConf, 1 );
 if( splits.length == 0 )
  complete = true;
