cascading.tap.Tap.getFullIdentifier()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(103)

本文整理了Java中cascading.tap.Tap.getFullIdentifier()方法的一些代码示例,展示了Tap.getFullIdentifier()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.getFullIdentifier()方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:getFullIdentifier

Tap.getFullIdentifier介绍

[英]Method getFullIdentifier returns a fully qualified resource identifier.
[中]方法getFullIdentifier返回完全限定的资源标识符。

代码示例

代码示例来源:origin: cwensel/cascading

@Override
public String getFullIdentifier( Config conf )
 {
 return original.getFullIdentifier( conf );
 }

代码示例来源:origin: cwensel/cascading

@Override
public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
 {
 return original.getFullIdentifier( flowProcess );
 }

代码示例来源:origin: cwensel/cascading

@Override
public String getFullIdentifier( TConfig conf )
 {
 return original.getFullIdentifier( configProvider.apply( conf ) );
 }

代码示例来源:origin: cwensel/cascading

@Override
public String getFullIdentifier( FlowProcess<? extends TConfig> flowProcess )
 {
 return original.getFullIdentifier( processProvider.apply( flowProcess ) );
 }

代码示例来源:origin: cwensel/cascading

protected String getVertex( Flow flow, Tap tap )
 {
 return tap.getFullIdentifier( flow.getConfig() );
 }
}

代码示例来源:origin: cwensel/cascading

/**
 * Method getFullIdentifier returns a fully qualified resource identifier.
 *
 * @param flowProcess of type FlowProcess
 * @return String
 */
public String getFullIdentifier( FlowProcess<? extends Config> flowProcess )
 {
 return getFullIdentifier( flowProcess.getConfig() );
 }

代码示例来源:origin: cwensel/cascading

public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
 {
 super( getSourceFields() );
 List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
 if( input == null )
  throw new IOException( "input cannot be null" );
 String identifier = parent.getFullIdentifier( flowProcess );
 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
 reset( iterators );
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

public CombinePartitionIterator( final FlowProcess<? extends Configuration> flowProcess, RecordReader input ) throws IOException
 {
 super( getSourceFields() );
 List<Iterator<Tuple>> iterators = new ArrayList<Iterator<Tuple>>();
 if( input == null )
  throw new IOException( "input cannot be null" );
 String identifier = parent.getFullIdentifier( flowProcess );
 iterators.add( createPartitionEntryIterator( flowProcess, input, identifier ) );
 reset( iterators );
 }

代码示例来源:origin: cwensel/cascading

protected Trie<Child> getTapPrefixMap( FlowProcess<? extends Config> flowProcess )
 {
 if( prefixMap != null )
  return prefixMap;
 prefixMap = new Trie<>();
 for( Child tap : taps )
  prefixMap.put( tap.getFullIdentifier( flowProcess ), tap );
 return prefixMap;
 }

代码示例来源:origin: cwensel/cascading

@Override
public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
 {
 if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
  throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) );
 conf.setBoolean( "mapred.mapper.new-api", false );
 if( getSinkCompression() == Compress.DISABLE )
  conf.setBoolean( "mapred.output.compress", false );
 else if( getSinkCompression() == Compress.ENABLE )
  conf.setBoolean( "mapred.output.compress", true );
 conf.setClass( "mapred.output.key.class", Text.class, Object.class );
 conf.setClass( "mapred.output.value.class", Text.class, Object.class );
 conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class );
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

@Override
public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
 {
 if( tap.getFullIdentifier( conf ).endsWith( ".zip" ) )
  throw new IllegalStateException( "cannot write zip files: " + HadoopUtil.getOutputPath( conf ) );
 conf.setBoolean( "mapred.mapper.new-api", false );
 if( getSinkCompression() == Compress.DISABLE )
  conf.setBoolean( "mapred.output.compress", false );
 else if( getSinkCompression() == Compress.ENABLE )
  conf.setBoolean( "mapred.output.compress", true );
 conf.setClass( "mapred.output.key.class", Text.class, Object.class );
 conf.setClass( "mapred.output.value.class", Text.class, Object.class );
 conf.setClass( "mapred.output.format.class", TextOutputFormat.class, OutputFormat.class );
 }

代码示例来源:origin: cwensel/cascading

private void commitTraps()
 {
 // commit all the traps, don't fail on an error
 for( Tap tap : traps.values() )
  {
  try
   {
   if( !tap.commitResource( getConfig() ) )
    logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ) );
   }
  catch( IOException exception )
   {
   logError( "unable to commit trap: " + tap.getFullIdentifier( getConfig() ), exception );
   }
  }
 }

代码示例来源:origin: cascading/lingual-core

public String[] getChildIdentifiers( FileType<Config> fileType ) throws IOException
 {
 if( !( (Tap) fileType ).resourceExists( getSystemConfig() ) )
  throw new IllegalStateException( "resource does not exist: " + ( (Tap) fileType ).getFullIdentifier( getSystemConfig() ) );
 return fileType.getChildIdentifiers( getSystemConfig() );
 }

代码示例来源:origin: cwensel/cascading

/**
 * Method deleteSinksIfReplace deletes all sinks that are configured with the {@link cascading.tap.SinkMode#REPLACE} flag.
 *
 * @throws IOException
 */
public void deleteSinksIfReplace() throws IOException
 {
 // verify all sinks before incrementally deleting for a replace
 for( Tap tap : sinks.values() )
  {
  if( tap.isKeep() && tap.resourceExists( getConfig() ) )
   throw new FlowTapException( "resource exists and sink mode is KEEP, cannot overwrite: " + tap.getFullIdentifier( getFlowProcess() ) );
  }
 for( Tap tap : sinks.values() )
  {
  if( tap.isReplace() )
   deleteOrFail( tap );
  }
 }

代码示例来源:origin: cwensel/cascading

private Throwable commitResource( Tap tap )
 {
 Throwable throwable = null;
 try
  {
  if( !tap.commitResource( getConfig() ) )
   {
   String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
   logError( message );
   throwable = new FlowException( message );
   }
  }
 catch( Throwable exception )
  {
  String message = "unable to commit sink: " + tap.getFullIdentifier( getConfig() );
  logError( message, exception );
  throwable = new FlowException( message, exception );
  }
 return throwable;
 }

代码示例来源:origin: cwensel/cascading

private void deleteOrFail( Tap tap ) throws IOException
 {
 if( !tap.resourceExists( getConfig() ) )
  return;
 if( !tap.deleteResource( getConfig() ) )
  throw new FlowTapException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
 }

代码示例来源:origin: cwensel/cascading

@Override
public Fields retrieveSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap )
 {
 if( !skipHeader || !getSourceFields().isUnknown() )
  return getSourceFields();
 // no need to open them all
 if( tap instanceof CompositeTap )
  tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
 // should revert to file:// (Lfs) if tap is Lfs
 if( tap instanceof TapWith )
  tap = ( (TapWith) tap ).withScheme( new TextLine( new Fields( "line" ), charsetName ) ).asTap();
 else
  tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess ) );
 setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
 return getSourceFields();
 }

代码示例来源:origin: cascading/cascading-hadoop2-io

@Override
public Fields retrieveSourceFields( FlowProcess<? extends Configuration> flowProcess, Tap tap )
 {
 if( !skipHeader || !getSourceFields().isUnknown() )
  return getSourceFields();
 // no need to open them all
 if( tap instanceof CompositeTap )
  tap = (Tap) ( (CompositeTap) tap ).getChildTaps().next();
 // should revert to file:// (Lfs) if tap is Lfs
 if( tap instanceof TapWith )
  tap = ( (TapWith) tap ).withScheme( new TextLine( new Fields( "line" ), charsetName ) ).asTap();
 else
  tap = new Hfs( new TextLine( new Fields( "line" ), charsetName ), tap.getFullIdentifier( flowProcess ) );
 setSourceFields( delimitedParser.parseFirstLine( flowProcess, tap ) );
 return getSourceFields();
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testLfs() throws URISyntaxException, IOException
 {
 Tap tap = new Lfs( new SequenceFile( new Fields( "foo" ) ), "some/path" );
 String path = tap.getFullIdentifier( getPlatform().getFlowProcess() );
 assertTrue( "wrong scheme", new Path( path ).toUri().getScheme().equalsIgnoreCase( "file" ) );
 new Lfs( new SequenceFile( new Fields( "foo" ) ), "file:///some/path" );
 try
  {
  new Lfs( new SequenceFile( new Fields( "foo" ) ), "s3://localhost:5001/some/path" );
  fail( "not valid url" );
  }
 catch( Exception exception )
  {
  }
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testLfs() throws URISyntaxException, IOException
 {
 Tap tap = new Lfs( new SequenceFile( new Fields( "foo" ) ), "some/path" );
 String path = tap.getFullIdentifier( getPlatform().getFlowProcess() );
 assertTrue( "wrong scheme", new Path( path ).toUri().getScheme().equalsIgnoreCase( "file" ) );
 new Lfs( new SequenceFile( new Fields( "foo" ) ), "file:///some/path" );
 try
  {
  new Lfs( new SequenceFile( new Fields( "foo" ) ), "s3://localhost:5001/some/path" );
  fail( "not valid url" );
  }
 catch( Exception exception )
  {
  }
 }

相关文章