cascading.tap.Tap类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(13.6k)|赞(0)|评价(0)|浏览(182)

本文整理了Java中cascading.tap.Tap类的一些代码示例,展示了Tap类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap类的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap

Tap介绍

[英]A Tap represents the physical data source or sink in a connected cascading.flow.Flow.

That is, a source Tap is the head end of a connected Pipe and Tuple stream, and a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk, distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts out the complexity of connecting to these types of data sources.

A Tap takes a Scheme instance, which is used to identify the type of resource (text file, binary file, etc). A Tap is responsible for how the resource is reached.

By default when planning a Flow, Tap equality is a function of the #getIdentifier() and #getScheme()values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source the same fields.

Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the where clause in a SQL statement so two taps reading from the same SQL table aren't considered equal.

Taps are also used to determine dependencies between two or more Flow instances when used with a cascading.cascade.Cascade. In that case the #getFullIdentifier(Object) value is used and the Scheme is ignored.
[中]Tap表示连接级联中的物理数据源或接收器。流流
也就是说,源抽头是连接管道和元组流的前端,而汇抽头是后端。各种Tap类型用于管理来自本地磁盘、分布式磁盘、远程存储(如Amazon S3)或通过FTP的文件。它只是将连接到这些类型的数据源的复杂性抽象出来。
Tap以Scheme实例为例,用于标识资源的类型(文本文件、二进制文件等)。Tap负责获取资源的方式。
默认情况下,在规划流时,Tap equality是#getIdentifier()和#getScheme()值的函数。也就是说,如果两个Tap实例接收/发送相同的资源,并且接收/发送相同的字段,则它们是相同的Tap实例。
一些更高级的点击,比如数据库点击,可能需要扩展相等以包括任何过滤,比如SQL语句中的where子句,因此从同一个SQL表读取的两个点击不被认为相等。
当用于级联时,抽头还用于确定两个或多个流实例之间的依赖关系。大量大量在这种情况下,将使用#getFullIdentifier(对象)值,并忽略该方案。

代码示例

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public TupleEntryIterator openForRead(FlowProcess<Object> flowProcess, Object input) throws IOException {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  return actualTap.openForRead(flowProcess, input);
}

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void sourceConfInit(FlowProcess<Object> flowProcess, Object conf) {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  actualTap.sourceConfInit(flowProcess, conf);
}

代码示例来源:origin: elastic/elasticsearch-hadoop

private void initInnerTapIfNotSet(Object target, String hadoopTypeName) {
    if (actualTap != null) {
      return;
    }

    Class<?> clz = null;
    try {
      clz = Class.forName(hadoopTypeName, false, getClass().getClassLoader());
      if (clz.isInstance(target)) {
        runningInHadoop = true;
      }
    } catch (ClassNotFoundException e) {
      runningInHadoop = false;
    }
    actualTap = (runningInHadoop ? new EsHadoopTap(host, port, resource, query, fields, props) : new EsLocalTap(host, port, resource, query, fields, props));
    setScheme(actualTap.getScheme());
    if (log.isDebugEnabled()) {
      log.debug(String.format("Detected %s environment; initializing [%s]", (runningInHadoop ? "Hadoop" : "local"), actualTap.getClass().getSimpleName()));
    }

    // use SLF4J just like Cascading
    if (!logVersion) {
      logVersion = true;
      Logger esTapLogger = LoggerFactory.getLogger(EsTap.class);
      esTapLogger.info(String.format("Elasticsearch Hadoop %s initialized", Version.version()));
      esTapLogger.warn("ES-Hadoop Cascading Integration is Deprecated as of 6.6.0 and will be removed in a later release.");
    }
  }
}

代码示例来源:origin: cwensel/cascading

@Test
public void testSkipStrategiesKeep() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Tap source = getPlatform().getTextFile( inputFileApache );
 // !!! enable replace
 Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 sink.deleteResource( flow.getConfig() );
 assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
 flow.complete();
 assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
 assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
 validateLength( flow, 10, null );
 }

代码示例来源:origin: cwensel/cascading

private void runPartitionTest( String postfix ) throws IOException
 {
 getPlatform().copyFromLocal( inputFileCrossX2 );
 Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
 Tap partitionTap = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", getOutputPath( "/partitioned" ), SinkMode.REPLACE );
 Partition partition = new DelimitedPartition( new Fields( "lower", "number" ), "/", postfix );
 partitionTap = getPlatform().getPartitionTap( partitionTap, partition, 1 );
 Flow firstFlow = getPlatform().getFlowConnector().connect( source, partitionTap, new Pipe( "partition" ) );
 firstFlow.complete();
 Tap sink = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), "+", getOutputPath( "/final" ), SinkMode.REPLACE );
 Flow secondFlow = getPlatform().getFlowConnector().connect( partitionTap, sink, new Pipe( "copy" ) );
 secondFlow.complete();
 Tap test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/a/1" + postfix );
 validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
 test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/b/2" + postfix );
 validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
 List<Tuple> tuples = asList( firstFlow, partitionTap );
 assertEquals( 2, Collections.frequency( tuples, new Tuple( "A", "a", "1" ) ) );
 assertEquals( 2, Collections.frequency( tuples, new Tuple( "B", "b", "2" ) ) );
 test = getPlatform().getTextFile( new Fields( "line" ), sink.getIdentifier() );
 validateLength( secondFlow.openTapForRead( test ), 74, Pattern.compile( "[0-9]\\+[a-z]\\+[A-Z]" ) );
 }

代码示例来源:origin: cwensel/cascading

private void runComprehensiveCase( Boolean[] testCase, boolean useCollectionsComparator ) throws IOException
 {
 getPlatform().copyFromLocal( inputFileCrossNulls );
 String test = Util.join( testCase, "_", true ) + "_" + useCollectionsComparator;
 String path = "comprehensive/" + test;
 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossNulls );
 Tap sink = getPlatform().getDelimitedFile( new Fields( "num", "lower", "upper" ).applyTypes( Long.class, String.class, String.class ), " ", getOutputPath( path ), SinkMode.REPLACE );
 sink.getScheme().setNumSinkParts( 1 );
 Pipe pipe = new Pipe( "comprehensivesort" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), "\\s" ) );
 pipe = new Each( pipe, new Fields( "num" ), new Identity( Long.class ), Fields.REPLACE );
 Fields groupFields = new Fields( "num" );
 if( testCase[ 0 ] )
  groupFields.setComparator( "num", useCollectionsComparator ? new NullSafeReverseComparator() : getPlatform().getLongComparator( true ) );
 Fields sortFields = null;
 if( testCase[ 1 ] != null )
  {
  sortFields = new Fields( "upper" );
  if( testCase[ 1 ] )
   sortFields.setComparator( "upper", useCollectionsComparator ? new NullSafeReverseComparator() : getPlatform().getStringComparator( true ) );
  }
 pipe = new GroupBy( pipe, groupFields, sortFields, testCase[ 2 ] );
 Map<Object, Object> properties = getProperties();
 if( getPlatform().isMapReduce() && getPlatform().getNumMapTasks( properties ) != null )
  getPlatform().setNumMapTasks( properties, 13 );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateCase( test, testCase, sink );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testTrapTapSourceSink() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache );
 Scheme scheme = getPlatform().getTestFailScheme();
 Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
 Pipe pipe = new Pipe( "map" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
 Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
 Map<Object, Object> properties = getProperties();
 // compensate for running in cluster mode
 getPlatform().setNumMapTasks( properties, 1 );
 getPlatform().setNumReduceTasks( properties, 1 );
 getPlatform().setNumGatherPartitionTasks( properties, 1 );
 Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
 flow.complete();
 validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
 validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
 }

代码示例来源:origin: cwensel/cascading

private void assertHeaders( Tap output, Flow flow ) throws IOException
 {
 TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getTextFile( new Fields( "line" ), output.getIdentifier() ) );
 assertEquals( iterator.next().getObject( 0 ), "first,second,third,fourth,fifth" );
 iterator.close();
 }

代码示例来源:origin: cwensel/cascading

Tap tap = (Tap) element;
if( tap.getIdentifier() == null )
 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );
Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) );
 String resourceSubPath = Tap.id( tap );
 Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null );
 current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) );
conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
if( tap.getIdentifier() == null )
 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );

代码示例来源:origin: cascading/cascading-platform

@Test
public void testCoGroupAroundCoGroupWith() throws Exception
 {
 // hack to get classname
 runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testMultiSourceIterator() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 getPlatform().copyFromLocal( inputFileUpper );
 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
 Tap source = new MultiSourceTap( sourceLower, sourceUpper );
 validateLength( source.openForRead( getPlatform().getFlowProcess() ), 10 );
 }

代码示例来源:origin: cwensel/cascading

@Override
public int hashCode()
 {
 int result = getIdentifier() != null ? getIdentifier().hashCode() : 0;
 result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 );
 return result;
 }

代码示例来源:origin: com.hotels/plunger

private TupleEntryIterator getHadoopTupleEntryIterator() throws IOException {
 @SuppressWarnings("unchecked")
 Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) source;
 JobConf conf = new JobConf();
 FlowProcess<JobConf> flowProcess = new HadoopFlowProcess(conf);
 hadoopTap.sourceConfInit(flowProcess, conf);
 return hadoopTap.openForRead(flowProcess);
}

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public TupleEntryCollector openForWrite(FlowProcess<Object> flowProcess, Object output) throws IOException {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  return actualTap.openForWrite(flowProcess, output);
}

代码示例来源:origin: com.twitter/maple

private void initialize() throws IOException {
 tap.sinkConfInit(hadoopFlowProcess, conf);
 OutputFormat outputFormat = conf.getOutputFormat();
 LOG.info("Output format class is: " + outputFormat.getClass().toString());
 writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);
 sinkCall.setOutput(this);
}

代码示例来源:origin: elastic/elasticsearch-hadoop

@Override
public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
  initInnerTapIfNotSetFromFlowProcess(flowProcess);
  actualTap.sinkConfInit(flowProcess, conf);
}

代码示例来源:origin: cwensel/cascading

/**
 * Method getFullIdentifier returns a fully qualified resource identifier.
 *
 * @param conf of type Config
 * @return String
 */
public String getFullIdentifier( Config conf )
 {
 return getIdentifier();
 }

代码示例来源:origin: com.hotels/plunger

private void writeToHadoopTap(Tap<?, ?, ?> tap) throws IOException {
 @SuppressWarnings("unchecked")
 Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) tap;
 JobConf conf = new JobConf();
 HadoopFlowProcess flowProcess = new HadoopFlowProcess(conf);
 hadoopTap.sinkConfInit(flowProcess, conf);
 TupleEntryCollector collector = hadoopTap.openForWrite(flowProcess);
 for (TupleEntry tuple : data.asTupleEntryList()) {
  collector.add(tuple);
 }
 collector.close();
}

代码示例来源:origin: cwensel/cascading

private void deleteOrFail( Tap tap ) throws IOException
 {
 if( !tap.resourceExists( getConfig() ) )
  return;
 if( !tap.deleteResource( getConfig() ) )
  throw new FlowTapException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
 }

代码示例来源:origin: cwensel/cascading

private static String createMessage( Tap tap, Fields incomingFields, Fields selectorFields )
 {
 String message = "unable to resolve scheme sink selector: " + selectorFields.printVerbose() +
  ", with incoming: " + incomingFields.printVerbose();
 return TraceUtil.formatTrace( tap.getScheme(), message );
 }
}

相关文章