本文整理了Java中cascading.pipe.Pipe.<init>()
方法的一些代码示例,展示了Pipe.<init>()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Pipe.<init>()
方法的具体详情如下:
包路径:cascading.pipe.Pipe
类名称:Pipe
方法名:<init>
[英]Constructor Pipe creates a new Pipe instance with the given name. This is useful as the 'start' or head of a pipe assembly.
[中]构造函数管道使用给定的名称创建一个新的管道实例。这对于管道部件的“起点”或头部很有用。
代码示例来源:origin: cwensel/cascading
@Test
public void testBuildMerge()
{
Tap sourceLower = getPlatform().getTextFile( "file1" );
Tap sourceUpper = getPlatform().getTextFile( "file2" );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Tap sink = getPlatform().getTextFile( "outpath", SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testBuildMerge()
{
Tap sourceLower = getPlatform().getTextFile( "file1" );
Tap sourceUpper = getPlatform().getTextFile( "file2" );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Tap sink = getPlatform().getTextFile( "outpath", SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
}
代码示例来源:origin: cascading/cascading-platform
private Flow secondFlow( String name, Tap source )
{
Pipe pipe = new Pipe( name );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third", "fourth" ), "\\." ) );
pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( name ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testUnGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileJoined );
Tap source = getPlatform().getTextFile( inputFileJoined );
Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 10 );
}
代码示例来源:origin: cwensel/cascading
private Flow secondFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "second" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third", "fourth" ), "\\." ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "first", "second", "third", "fourth" ), getOutputPath( path ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( "second", source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow secondFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "second" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third", "fourth" ), "\\." ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "first", "second", "third", "fourth" ), getOutputPath( path + "/second" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow thirdFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "third" );
pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( path + "/third" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cascading/cascading-platform
private Flow firstFlow( String path )
{
Tap source = getPlatform().getTextFile( inputFileIps );
Pipe pipe = new Pipe( "first" );
pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( path + "/first" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow firstFlow( String name )
{
Tap source = getPlatform().getTextFile( inputFileIps );
Pipe pipe = new Pipe( name );
pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( name ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow firstFlow( String path )
{
Tap source = getPlatform().getTextFile( inputFileIps );
Pipe pipe = new Pipe( "first" );
pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( path + "/first" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cascading/cascading-platform
private Flow thirdFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "third" );
pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( path ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( "third", source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSwap() throws Exception
{
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testUnGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileJoined );
Tap source = getPlatform().getTextFile( inputFileJoined );
Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) );
pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 10 );
}
代码示例来源:origin: cascading/cascading-platform
private Flow secondFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "second" );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third", "fourth" ), "\\." ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "first", "second", "third", "fourth" ), getOutputPath( path ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( "second", source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow previousMultiTapFlow( String path, String ordinal )
{
Tap source = getPlatform().getTextFile( inputFileIps );
Pipe pipe = new Pipe( ordinal );
pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( path + "/" + ordinal ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( "previous-multi-tap-" + ordinal, source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow secondFlow( String name, Tap source )
{
Pipe pipe = new Pipe( name );
pipe = new Each( pipe, new RegexSplitter( new Fields( "first", "second", "third", "fourth" ), "\\." ) );
pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( name ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cascading/cascading-platform
private Flow previousMultiTapFlow( String path, String ordinal )
{
Tap source = getPlatform().getTextFile( inputFileIps );
Pipe pipe = new Pipe( ordinal );
pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "ip" ) ), new Fields( "ip" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( path + "/" + ordinal ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( "previous-multi-tap-" + ordinal, source, sink, pipe );
}
代码示例来源:origin: cascading/cascading-platform
private Flow thirdFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "third" );
pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( path + "/third" ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( source, sink, pipe );
}
代码示例来源:origin: cwensel/cascading
private Flow thirdFlow( Tap source, String path )
{
Pipe pipe = new Pipe( "third" );
pipe = new Each( pipe, new FieldJoiner( new Fields( "mangled" ), "-" ) );
Tap sink = getPlatform().getTabDelimitedFile( new Fields( "mangled" ), getOutputPath( path ), SinkMode.REPLACE );
return getPlatform().getFlowConnector().connect( "third", source, sink, pipe );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSwap() throws Exception
{
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache );
Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" );
pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) );
pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) );
}
内容来源于网络,如有侵权,请联系作者删除!