本文整理了Java中cascading.tap.Tap.getFullIdentifier()
方法的一些代码示例,展示了Tap.getFullIdentifier()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.getFullIdentifier()
方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:getFullIdentifier
[英]Method getFullIdentifier returns a fully qualified resource identifier.
[中]方法getFullIdentifier返回完全限定的资源标识符。
代码示例来源:origin: cwensel/cascading
@Override
public String getFullIdentifier( Config conf )
{
return original.getFullIdentifier( conf );
}
代码示例来源:origin: cwensel/cascading
@Override
public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
{
return original.getFullIdentifier( flowProcess );
}
代码示例来源:origin: cwensel/cascading
@Override
public String getFullIdentifier( TConfig conf )
{
return original.getFullIdentifier( configProvider.apply( conf ) );
}
代码示例来源:origin: cwensel/cascading
@Override
public String getFullIdentifier( FlowProcess<? extends TConfig> flowProcess )
{
return original.getFullIdentifier( processProvider.apply( flowProcess ) );
}
代码示例来源:origin: cwensel/cascading
protected String getVertex( Flow flow, Tap tap )
{
return tap.getFullIdentifier( flow.getConfig() );
}
}
代码示例来源:origin: cwensel/cascading
/**
* Method getFullIdentifier returns a fully qualified resource identifier.
*
* @param flowProcess of type FlowProcess
* @return String
*/
public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
{
return getFullIdentifier( flowProcess.getConfig() );
}
代码示例来源:origin: cwensel/cascading
public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
{
super( getSourceFields() );
List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
if( input == null )
throw new IOException( "input cannot be null" );
String identifier = parent.getFullIdentifier( flowProcess );
iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
reset( iterators );
}
代码示例来源:origin: cascading/cascading-hadoop2-io
public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
{
super( getSourceFields() );
List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
if( input == null )
throw new IOException( "input cannot be null" );
String identifier = parent.getFullIdentifier( flowProcess );
iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
reset( iterators );
}
代码示例来源:origin: cwensel/cascading
protected Trie<Child> getTapPrefixMap( FlowProcess<? extends Config> flowProcess )
{
if( prefixMap != null )
return prefixMap;
prefixMap = new Trie<>();
for( Child tap : taps )
prefixMap.put( tap.getFullIdentifier( flowProcess ), tap );
return prefixMap;
}
代码示例来源:origin: cwensel/cascading
@Override
public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
{
if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) );
conf.setBoolean( "mapred.mapper.new-api", false );
if( getSinkCompression() == Compress.DISABLE )
conf.setBoolean( "mapred.output.compress", false );
else if( getSinkCompression() == Compress.ENABLE )
conf.setBoolean( "mapred.output.compress", true );
conf.setClass( "mapred.output.key.class", Text.class, Object.class );
conf.setClass( "mapred.output.value.class", Text.class, Object.class );
conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class );
}
代码示例来源:origin: cascading/cascading-hadoop2-io
@Override
public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
{
if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) );
conf.setBoolean( "mapred.mapper.new-api", false );
if( getSinkCompression() == Compress.DISABLE )
conf.setBoolean( "mapred.output.compress", false );
else if( getSinkCompression() == Compress.ENABLE )
conf.setBoolean( "mapred.output.compress", true );
conf.setClass( "mapred.output.key.class", Text.class, Object.class );
conf.setClass( "mapred.output.value.class", Text.class, Object.class );
conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class );
}
代码示例来源:origin: cwensel/cascading
private void commitTraps()
{
// commit all the traps, don't fail on an error
for( Tap tap : traps.values() )
{
try
{
if( !tap.commitResource( getConfig() ) )
logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) );
}
catch( IOException exception )
{
logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
}
}
}
代码示例来源:origin: cascading/lingual-core
public String[] getChildIdentifiers( FileType<Config> fileType ) throws IOException
{
if( !( (Tap) fileType ).resourceExists( getSystemConfig() ) )
throw new IllegalStateException( "resource does not exist: " + ( (Tap) fileType ).getFullIdentifier( getSystemConfig() ) );
return fileType.getChildIdentifiers( getSystemConfig() );
}
代码示例来源:origin: cwensel/cascading
/**
* Method deleteSinksIfReplace deletes all sinks that are configured with the {@link cascading.tap.SinkMode#REPLACE} flag.
*
* @throws IOException
*/
public void deleteSinksIfReplace() throws IOException
{
// verify all sinks before incrementally deleting for a replace
for( Tap tap : sinks.values() )
{
if( tap.isKeep() && tap.resourceExists( getConfig() ) )
throw new FlowTapException( "resource exists and sink mode is KEEP, cannot overwrite: " + tap.getFullIdentifier( getFlowProcess() ) );
}
for( Tap tap : sinks.values() )
{
if( tap.isReplace() )
deleteOrFail( tap );
}
}
代码示例来源:origin: cwensel/cascading
private Throwable commitResource( Tap tap )
{
Throwable throwable = null;
try
{
if( !tap.commitResource( getConfig() ) )
{
String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
logError( message );
throwable = new FlowException( message );
}
}
catch( Throwable exception )
{
String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
logError( message, exception );
throwable = new FlowException( message, exception );
}
return throwable;
}
代码示例来源:origin: cwensel/cascading
private void deleteOrFail( Tap tap ) throws IOException
{
if( !tap.resourceExists( getConfig() ) )
return;
if( !tap.deleteResource( getConfig() ) )
throw new FlowTapException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
}
代码示例来源:origin: cwensel/cascading
@Override
public Fields retrieveSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap )
{
if( !skipHeader || !getSourceFields().isUnknown() )
return getSourceFields();
// no need to open them all
if( tap instanceof CompositeTap )
tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
// should revert to file:// (Lfs) if tap is Lfs
if( tap instanceof TapWith )
tap = ( (TapWith) tap ).withScheme( new TextLine( new Fields( "line" ), charsetName ) ).asTap();
else
tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess ) );
setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
return getSourceFields();
}
代码示例来源:origin: cascading/cascading-hadoop2-io
@Override
public Fields retrieveSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap )
{
if( !skipHeader || !getSourceFields().isUnknown() )
return getSourceFields();
// no need to open them all
if( tap instanceof CompositeTap )
tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
// should revert to file:// (Lfs) if tap is Lfs
if( tap instanceof TapWith )
tap = ( (TapWith) tap ).withScheme( new TextLine( new Fields( "line" ), charsetName ) ).asTap();
else
tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess ) );
setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
return getSourceFields();
}
代码示例来源:origin: cwensel/cascading
@Test
public void testLfs() throws URISyntaxException, IOException
{
Tap tap = new Lfs( new SequenceFile( new Fields( "foo" ) ), "some/path" );
String path = tap.getFullIdentifier( getPlatform().getFlowProcess() );
assertTrue( "wrong scheme", new Path( path ).toUri().getScheme().equalsIgnoreCase( "file" ) );
new Lfs( new SequenceFile( new Fields( "foo" ) ), "file:///some/path" );
try
{
new Lfs( new SequenceFile( new Fields( "foo" ) ), "s3://localhost:5001/some/path" );
fail( "not valid url" );
}
catch( Exception exception )
{
}
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testLfs() throws URISyntaxException, IOException
{
Tap tap = new Lfs( new SequenceFile( new Fields( "foo" ) ), "some/path" );
String path = tap.getFullIdentifier( getPlatform().getFlowProcess() );
assertTrue( "wrong scheme", new Path( path ).toUri().getScheme().equalsIgnoreCase( "file" ) );
new Lfs( new SequenceFile( new Fields( "foo" ) ), "file:///some/path" );
try
{
new Lfs( new SequenceFile( new Fields( "foo" ) ), "s3://localhost:5001/some/path" );
fail( "not valid url" );
}
catch( Exception exception )
{
}
}
内容来源于网络,如有侵权,请联系作者删除!