本文整理了Java中cascading.tap.Tap
类的一些代码示例,展示了Tap
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap
类的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
[英]A Tap represents the physical data source or sink in a connected cascading.flow.Flow.
That is, a source Tap is the head end of a connected Pipe and Tuple stream, and a sink Tap is the tail end. Kinds of Tap types are used to manage files from a local disk, distributed disk, remote storage like Amazon S3, or via FTP. It simply abstracts out the complexity of connecting to these types of data sources.
A Tap takes a Scheme instance, which is used to identify the type of resource (text file, binary file, etc). A Tap is responsible for how the resource is reached.
By default when planning a Flow, Tap equality is a function of the #getIdentifier() and #getScheme()values. That is, two Tap instances are the same Tap instance if they sink/source the same resource and sink/source the same fields.
Some more advanced taps, like a database tap, may need to extend equality to include any filtering, like the where clause in a SQL statement so two taps reading from the same SQL table aren't considered equal.
Taps are also used to determine dependencies between two or more Flow instances when used with a cascading.cascade.Cascade. In that case the #getFullIdentifier(Object) value is used and the Scheme is ignored.
[中]Tap表示连接级联中的物理数据源或接收器。流流
也就是说,源抽头是连接管道和元组流的前端,而汇抽头是后端。各种Tap类型用于管理来自本地磁盘、分布式磁盘、远程存储(如Amazon S3)或通过FTP的文件。它只是将连接到这些类型的数据源的复杂性抽象出来。
Tap以Scheme实例为例,用于标识资源的类型(文本文件、二进制文件等)。Tap负责获取资源的方式。
默认情况下,在规划流时,Tap equality是#getIdentifier()和#getScheme()值的函数。也就是说,如果两个Tap实例接收/发送相同的资源,并且接收/发送相同的字段,则它们是相同的Tap实例。
一些更高级的点击,比如数据库点击,可能需要扩展相等以包括任何过滤,比如SQL语句中的where子句,因此从同一个SQL表读取的两个点击不被认为相等。
当用于级联时,抽头还用于确定两个或多个流实例之间的依赖关系。大量大量在这种情况下,将使用#getFullIdentifier(对象)值,并忽略该方案。
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public TupleEntryIterator openForRead(FlowProcess<Object> flowProcess, Object input) throws IOException {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
return actualTap.openForRead(flowProcess, input);
}
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void sourceConfInit(FlowProcess<Object> flowProcess, Object conf) {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
actualTap.sourceConfInit(flowProcess, conf);
}
代码示例来源:origin: elastic/elasticsearch-hadoop
private void initInnerTapIfNotSet(Object target, String hadoopTypeName) {
if (actualTap != null) {
return;
}
Class<?> clz = null;
try {
clz = Class.forName(hadoopTypeName, false, getClass().getClassLoader());
if (clz.isInstance(target)) {
runningInHadoop = true;
}
} catch (ClassNotFoundException e) {
runningInHadoop = false;
}
actualTap = (runningInHadoop ? new EsHadoopTap(host, port, resource, query, fields, props) : new EsLocalTap(host, port, resource, query, fields, props));
setScheme(actualTap.getScheme());
if (log.isDebugEnabled()) {
log.debug(String.format("Detected %s environment; initializing [%s]", (runningInHadoop ? "Hadoop" : "local"), actualTap.getClass().getSimpleName()));
}
// use SLF4J just like Cascading
if (!logVersion) {
logVersion = true;
Logger esTapLogger = LoggerFactory.getLogger(EsTap.class);
esTapLogger.info(String.format("Elasticsearch Hadoop %s initialized", Version.version()));
esTapLogger.warn("ES-Hadoop Cascading Integration is Deprecated as of 6.6.0 and will be removed in a later release.");
}
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSkipStrategiesKeep() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Tap source = getPlatform().getTextFile( inputFileApache );
// !!! enable replace
Tap sink = getPlatform().getTextFile( getOutputPath( "keep" ), SinkMode.KEEP );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
sink.deleteResource( flow.getConfig() );
assertTrue( "default skip", !flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", !new FlowSkipIfSinkExists().skipFlow( flow ) );
flow.complete();
assertTrue( "default skip", flow.getFlowSkipStrategy().skipFlow( flow ) );
assertTrue( "exist skip", new FlowSkipIfSinkExists().skipFlow( flow ) );
validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check
validateLength( flow, 10, null );
}
代码示例来源:origin: cwensel/cascading
private void runPartitionTest( String postfix ) throws IOException
{
getPlatform().copyFromLocal( inputFileCrossX2 );
Tap source = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), " ", inputFileCrossX2 );
Tap partitionTap = getPlatform().getDelimitedFile( new Fields( "upper" ), "+", getOutputPath( "/partitioned" ), SinkMode.REPLACE );
Partition partition = new DelimitedPartition( new Fields( "lower", "number" ), "/", postfix );
partitionTap = getPlatform().getPartitionTap( partitionTap, partition, 1 );
Flow firstFlow = getPlatform().getFlowConnector().connect( source, partitionTap, new Pipe( "partition" ) );
firstFlow.complete();
Tap sink = getPlatform().getDelimitedFile( new Fields( "number", "lower", "upper" ), "+", getOutputPath( "/final" ), SinkMode.REPLACE );
Flow secondFlow = getPlatform().getFlowConnector().connect( partitionTap, sink, new Pipe( "copy" ) );
secondFlow.complete();
Tap test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/a/1" + postfix );
validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
test = getPlatform().getTextFile( new Fields( "line" ), partitionTap.getIdentifier().toString() + "/b/2" + postfix );
validateLength( firstFlow.openTapForRead( test ), 6, Pattern.compile( "[A-Z]" ) );
List<Tuple> tuples = asList( firstFlow, partitionTap );
assertEquals( 2, Collections.frequency( tuples, new Tuple( "A", "a", "1" ) ) );
assertEquals( 2, Collections.frequency( tuples, new Tuple( "B", "b", "2" ) ) );
test = getPlatform().getTextFile( new Fields( "line" ), sink.getIdentifier() );
validateLength( secondFlow.openTapForRead( test ), 74, Pattern.compile( "[0-9]\\+[a-z]\\+[A-Z]" ) );
}
代码示例来源:origin: cwensel/cascading
private void runComprehensiveCase( Boolean[] testCase, boolean useCollectionsComparator ) throws IOException
{
getPlatform().copyFromLocal( inputFileCrossNulls );
String test = Util.join( testCase, "_", true ) + "_" + useCollectionsComparator;
String path = "comprehensive/" + test;
Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossNulls );
Tap sink = getPlatform().getDelimitedFile( new Fields( "num", "lower", "upper" ).applyTypes( Long.class, String.class, String.class ), " ", getOutputPath( path ), SinkMode.REPLACE );
sink.getScheme().setNumSinkParts( 1 );
Pipe pipe = new Pipe( "comprehensivesort" );
pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), "\\s" ) );
pipe = new Each( pipe, new Fields( "num" ), new Identity( Long.class ), Fields.REPLACE );
Fields groupFields = new Fields( "num" );
if( testCase[ 0 ] )
groupFields.setComparator( "num", useCollectionsComparator ? new NullSafeReverseComparator() : getPlatform().getLongComparator( true ) );
Fields sortFields = null;
if( testCase[ 1 ] != null )
{
sortFields = new Fields( "upper" );
if( testCase[ 1 ] )
sortFields.setComparator( "upper", useCollectionsComparator ? new NullSafeReverseComparator() : getPlatform().getStringComparator( true ) );
}
pipe = new GroupBy( pipe, groupFields, sortFields, testCase[ 2 ] );
Map<Object, Object> properties = getProperties();
if( getPlatform().isMapReduce() && getPlatform().getNumMapTasks( properties ) != null )
getPlatform().setNumMapTasks( properties, 13 );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateCase( test, testCase, sink );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testTrapTapSourceSink() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Scheme scheme = getPlatform().getTestFailScheme();
Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
Pipe pipe = new Pipe( "map" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesink/sink" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesink/trap" ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
// compensate for running in cluster mode
getPlatform().setNumMapTasks( properties, 1 );
getPlatform().setNumReduceTasks( properties, 1 );
getPlatform().setNumGatherPartitionTasks( properties, 1 );
Flow flow = getPlatform().getFlowConnector( properties ).connect( "trap test", source, sink, trap, pipe );
flow.complete();
validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 7 );
validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
}
代码示例来源:origin: cwensel/cascading
private void assertHeaders( Tap output, Flow flow ) throws IOException
{
TupleEntryIterator iterator = flow.openTapForRead( getPlatform().getTextFile( new Fields( "line" ), output.getIdentifier() ) );
assertEquals( iterator.next().getObject( 0 ), "first,second,third,fourth,fifth" );
iterator.close();
}
代码示例来源:origin: cwensel/cascading
Tap tap = (Tap) element;
if( tap.getIdentifier() == null )
throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );
Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) );
String resourceSubPath = Tap.id( tap );
Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null );
current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) );
conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
if( tap.getIdentifier() == null )
throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );
代码示例来源:origin: cascading/cascading-platform
@Test
public void testCoGroupAroundCoGroupWith() throws Exception
{
// hack to get classname
runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testMultiSourceIterator() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
getPlatform().copyFromLocal( inputFileUpper );
Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
Tap source = new MultiSourceTap( sourceLower, sourceUpper );
validateLength( source.openForRead( getPlatform().getFlowProcess() ), 10 );
}
代码示例来源:origin: cwensel/cascading
@Override
public int hashCode()
{
int result = getIdentifier() != null ? getIdentifier().hashCode() : 0;
result = 31 * result + ( getScheme() != null ? getScheme().hashCode() : 0 );
return result;
}
代码示例来源:origin: com.hotels/plunger
private TupleEntryIterator getHadoopTupleEntryIterator() throws IOException {
@SuppressWarnings("unchecked")
Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) source;
JobConf conf = new JobConf();
FlowProcess<JobConf> flowProcess = new HadoopFlowProcess(conf);
hadoopTap.sourceConfInit(flowProcess, conf);
return hadoopTap.openForRead(flowProcess);
}
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public TupleEntryCollector openForWrite(FlowProcess<Object> flowProcess, Object output) throws IOException {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
return actualTap.openForWrite(flowProcess, output);
}
代码示例来源:origin: com.twitter/maple
private void initialize() throws IOException {
tap.sinkConfInit(hadoopFlowProcess, conf);
OutputFormat outputFormat = conf.getOutputFormat();
LOG.info("Output format class is: " + outputFormat.getClass().toString());
writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);
sinkCall.setOutput(this);
}
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
actualTap.sinkConfInit(flowProcess, conf);
}
代码示例来源:origin: cwensel/cascading
/**
* Method getFullIdentifier returns a fully qualified resource identifier.
*
* @param conf of type Config
* @return String
*/
public String getFullIdentifier( Config conf )
{
return getIdentifier();
}
代码示例来源:origin: com.hotels/plunger
private void writeToHadoopTap(Tap<?, ?, ?> tap) throws IOException {
@SuppressWarnings("unchecked")
Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) tap;
JobConf conf = new JobConf();
HadoopFlowProcess flowProcess = new HadoopFlowProcess(conf);
hadoopTap.sinkConfInit(flowProcess, conf);
TupleEntryCollector collector = hadoopTap.openForWrite(flowProcess);
for (TupleEntry tuple : data.asTupleEntryList()) {
collector.add(tuple);
}
collector.close();
}
代码示例来源:origin: cwensel/cascading
private void deleteOrFail( Tap tap ) throws IOException
{
if( !tap.resourceExists( getConfig() ) )
return;
if( !tap.deleteResource( getConfig() ) )
throw new FlowTapException( "unable to delete resource: " + tap.getFullIdentifier( getFlowProcess() ) );
}
代码示例来源:origin: cwensel/cascading
private static String createMessage( Tap tap, Fields incomingFields, Fields selectorFields )
{
String message = "unable to resolve scheme sink selector: " + selectorFields.printVerbose() +
", with incoming: " + incomingFields.printVerbose();
return TraceUtil.formatTrace( tap.getScheme(), message );
}
}
内容来源于网络,如有侵权,请联系作者删除!