本文整理了Java中cascading.tap.Tap.id()
方法的一些代码示例,展示了Tap.id()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.id()
方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:id
[英]Field id
[中]字段id
代码示例来源:origin: cwensel/cascading
public static Tap getTapForID( Set<Tap> taps, String id )
{
for( Tap tap : taps )
{
if( Tap.id( tap ).equals( id ) )
return tap;
}
return null;
}
代码示例来源:origin: cascading/cascading-hadoop2-tez
@Override
protected void addLocalCacheFiles( Configuration conf, URI uri )
{
String key = CASCADING_LOCAL_RESOURCES + Tap.id( this );
Collection<String> resources = conf.getStringCollection( key );
if( resources == null )
resources = new ArrayList<>();
resources.add( uri.toString() );
conf.setStrings( key, resources.toArray( new String[ resources.size() ] ) );
}
}
代码示例来源:origin: cwensel/cascading
@Override
protected void addLocalCacheFiles( Configuration conf, URI uri )
{
String key = CASCADING_LOCAL_RESOURCES + Tap.id( this );
Collection<String> resources = conf.getStringCollection( key );
if( resources == null )
resources = new ArrayList<>();
resources.add( uri.toString() );
conf.setStrings( key, resources.toArray( new String[ resources.size() ] ) );
}
}
代码示例来源:origin: cwensel/cascading
/**
* Creates and returns a unique ID for the given Tap, this value is cached and may be used to uniquely identify
* the Tap instance in properties files etc.
* <p>
* This value is generally reproducible assuming the Tap identifier and the Scheme source and sink Fields remain consistent.
*
* @param tap of type Tap
* @return of type String
*/
public static synchronized String id( Tap tap )
{
if( tap instanceof DecoratorTap )
return id( ( (DecoratorTap) tap ).getOriginal() );
return tap.id;
}
代码示例来源:origin: cwensel/cascading
public static String id( FlowElement flowElement )
{
if( flowElement instanceof Pipe )
return Pipe.id( (Pipe) flowElement );
if( flowElement instanceof Tap )
return Tap.id( (Tap) flowElement );
String id = Util.returnInstanceFieldIfExistsSafe( flowElement, "id" );
if( id != null )
return id;
throw new IllegalArgumentException( "id not supported for: " + flowElement.getClass().getCanonicalName() );
}
代码示例来源:origin: cascading/cascading-hadoop2-tez
@Override
protected SinkStage createSinkStage( Tap sink )
{
String id = Tap.id( sink );
LogicalOutput logicalOutput = outputMap.get( id );
if( logicalOutput == null )
logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
if( logicalOutput == null )
throw new IllegalStateException( "could not find output for: " + sink );
return new TezSinkStage( flowProcess, sink, logicalOutput );
}
代码示例来源:origin: cwensel/cascading
@Override
protected Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException
{
String key = CASCADING_REMOTE_RESOURCES + Tap.id( this );
String property = flowProcess.getStringProperty( key );
if( property == null )
throw new TapException( "unable to find local resources property for: " + key );
String[] split = property.split( "," );
Path[] paths = new Path[ split.length ];
for( int i = 0; i < split.length; i++ )
paths[ i ] = new Path( split[ i ] );
return paths;
}
代码示例来源:origin: cwensel/cascading
@Override
protected SinkStage createSinkStage( Tap sink )
{
String id = Tap.id( sink );
LogicalOutput logicalOutput = outputMap.get( id );
if( logicalOutput == null )
logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
if( logicalOutput == null )
throw new IllegalStateException( "could not find output for: " + sink );
return new TezSinkStage( flowProcess, sink, logicalOutput );
}
代码示例来源:origin: cascading/cascading-hadoop2-tez
@Override
protected Path[] getLocalCacheFiles( FlowProcess<? extends Configuration> flowProcess ) throws IOException
{
String key = CASCADING_REMOTE_RESOURCES + Tap.id( this );
String property = flowProcess.getStringProperty( key );
if( property == null )
throw new TapException( "unable to find local resources property for: " + key );
String[] split = property.split( "," );
Path[] paths = new Path[ split.length ];
for( int i = 0; i < split.length; i++ )
paths[ i ] = new Path( split[ i ] );
return paths;
}
代码示例来源:origin: cwensel/cascading
protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
{
String id = Tap.id( source );
LogicalInput logicalInput = inputMap.get( id );
if( logicalInput == null )
logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
if( logicalInput == null )
return new SourceStage( flowProcess, source );
return new TezSourceStage( flowProcess, source, logicalInput );
}
代码示例来源:origin: cascading/cascading-hadoop2-tez
protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
{
String id = Tap.id( source );
LogicalInput logicalInput = inputMap.get( id );
if( logicalInput == null )
logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
if( logicalInput == null )
return new SourceStage( flowProcess, source );
return new TezSourceStage( flowProcess, source, logicalInput );
}
代码示例来源:origin: cwensel/cascading
@Override
public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
{
if( HadoopUtil.isLocal( conf ) ||
Tap.id( this ).equals( conf.get( "cascading.node.source" ) ) ||
Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
{
LOG.info( "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
super.sourceConfInit( process, conf );
return;
}
try
{
registerHfs( process, conf, getHfs() );
}
catch( IOException exception )
{
throw new TapException( exception );
}
}
代码示例来源:origin: cascading/cascading-hadoop2-io
@Override
public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
{
if( HadoopUtil.isLocal( conf ) ||
Tap.id( this ).equals( conf.get( "cascading.node.source" ) ) ||
Tap.id( this ).equals( conf.get( "cascading.step.source" ) ) )
{
LOG.info( "can't use distributed cache. reading '{}' from hdfs", super.getIdentifier() );
super.sourceConfInit( process, conf );
return;
}
try
{
registerHfs( process, conf, getHfs() );
}
catch( IOException exception )
{
throw new TapException( exception );
}
}
代码示例来源:origin: cwensel/cascading
protected void buildGraph()
{
streamedHead = handleHead( this.source, flowProcess );
Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
tributaries.remove( this.source ); // we cannot stream and accumulate the same source
// accumulated paths
for( Object source : tributaries )
{
final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
JobConf conf = hadoopProcess.getJobConf();
// allows client side config to be used cluster side
String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
if( property == null )
throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
conf = getSourceConf( hadoopProcess, conf, property );
// the reporter isn't provided until after the #run method is called
flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
{
@Override
public Reporter getReporter()
{
return hadoopProcess.getReporter();
}
};
handleHead( (Tap) source, flowProcess );
}
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
protected void buildGraph()
{
streamedHead = handleHead( this.source, flowProcess );
Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
tributaries.remove( this.source ); // we cannot stream and accumulate the same source
// accumulated paths
for( Object source : tributaries )
{
final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
JobConf conf = hadoopProcess.getJobConf();
// allows client side config to be used cluster side
String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
if( property == null )
throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
conf = getSourceConf( hadoopProcess, conf, property );
// the reporter isn't provided until after the #run method is called
flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
{
@Override
public Reporter getReporter()
{
return hadoopProcess.getReporter();
}
};
handleHead( (Tap) source, flowProcess );
}
}
代码示例来源:origin: cwensel/cascading
streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
代码示例来源:origin: cascading/cascading-hadoop2-mr1
streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
代码示例来源:origin: dataArtisans/cascading-flink
private DataSet<Tuple> translateSource(FlowProcess flowProcess, ExecutionEnvironment env, FlowNode node, int dop) {
Tap tap = this.getSingle(node.getSourceTaps());
JobConf tapConfig = new JobConf(this.getNodeConfig(node));
tap.sourceConfInit(flowProcess, tapConfig);
tapConfig.set( "cascading.step.source", Tap.id( tap ) );
Fields outFields = tap.getSourceFields();
registerKryoTypes(outFields);
JobConf sourceConfig = new JobConf(this.getNodeConfig(node));
MultiInputFormat.addInputFormat(sourceConfig, tapConfig);
DataSet<Tuple> src = env
.createInput(new TapInputFormat(node), new TupleTypeInfo(outFields))
.name(tap.getIdentifier())
.setParallelism(dop)
.withParameters(FlinkConfigConverter.toFlinkConfig(new Configuration(sourceConfig)));
return src;
}
代码示例来源:origin: cwensel/cascading
private void initialize() throws IOException
{
// prevent collisions of configuration properties set client side if now cluster side
String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
if( property == null )
{
// default behavior is to accumulate paths, so remove any set prior
conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
tap.sourceConfInit( flowProcess, conf );
}
JobConf jobConf = asJobConfInstance( conf );
inputFormat = jobConf.getInputFormat();
if( inputFormat instanceof JobConfigurable )
( (JobConfigurable) inputFormat ).configure( jobConf );
// do not test for existence, let hadoop decide how to handle the given path
// this delegates globbing to the inputformat on split generation.
splits = inputFormat.getSplits( jobConf, 1 );
if( splits.length == 0 )
complete = true;
}
代码示例来源:origin: cascading/cascading-hadoop2-io
private void initialize() throws IOException
{
// prevent collisions of configuration properties set client side if now cluster side
String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
if( property == null )
{
// default behavior is to accumulate paths, so remove any set prior
conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
tap.sourceConfInit( flowProcess, conf );
}
JobConf jobConf = asJobConfInstance( conf );
inputFormat = jobConf.getInputFormat();
if( inputFormat instanceof JobConfigurable )
( (JobConfigurable) inputFormat ).configure( jobConf );
// do not test for existence, let hadoop decide how to handle the given path
// this delegates globbing to the inputformat on split generation.
splits = inputFormat.getSplits( jobConf, 1 );
if( splits.length == 0 )
complete = true;
}
内容来源于网络,如有侵权,请联系作者删除!