本文整理了Java中cascading.tap.Tap.sourceConfInit()
方法的一些代码示例,展示了Tap.sourceConfInit()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.sourceConfInit()
方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:sourceConfInit
[英]Method sourceConfInit initializes this instance as a source.
This method maybe called more than once if this Tap instance is used outside the scope of a cascading.flow.Flowinstance or if it participates in multiple times in a given Flow or across different Flows in a cascading.cascade.Cascade.
In the context of a Flow, it will be called after cascading.flow.FlowListener#onStarting(cascading.flow.Flow)
Note that no resources or services should be modified by this method.
[中]方法sourceConfInit将此实例初始化为源。
如果此Tap实例在级联范围之外使用,则可能会多次调用此方法。流Flowinstance,或者它多次参与给定流或级联中的不同流。大量大量
在流的上下文中,它将在级联后被调用。流FlowListener#启动(级联.流.流)
请注意,此方法不应修改任何资源或服务。
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void sourceConfInit(FlowProcess<Object> flowProcess, Object conf) {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
actualTap.sourceConfInit(flowProcess, conf);
}
代码示例来源:origin: cwensel/cascading
@Override
public void sourceConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
{
original.sourceConfInit( flowProcess, conf );
}
代码示例来源:origin: cwensel/cascading
@Override
public void sourceConfInit( FlowProcess<? extends TConfig> flowProcess, TConfig conf )
{
original.sourceConfInit( processProvider.apply( flowProcess ), configProvider.apply( conf ) );
}
代码示例来源:origin: cwensel/cascading
@Override
public void sourceConfInit( FlowProcess<? extends Config> process, Config conf )
{
for( Tap tap : getTaps() )
tap.sourceConfInit( process, conf );
}
代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop
@Override
public void sourceConfInit(FlowProcess<Object> flowProcess, Object conf) {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
actualTap.sourceConfInit(flowProcess, conf);
}
代码示例来源:origin: com.hotels/plunger
private TupleEntryIterator getHadoopTupleEntryIterator() throws IOException {
@SuppressWarnings("unchecked")
Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) source;
JobConf conf = new JobConf();
FlowProcess<JobConf> flowProcess = new HadoopFlowProcess(conf);
hadoopTap.sourceConfInit(flowProcess, conf);
return hadoopTap.openForRead(flowProcess);
}
代码示例来源:origin: com.hotels/plunger
private TupleEntryIterator getLocalTupleEntryIterator() throws IOException {
@SuppressWarnings("unchecked")
Tap<Properties, ?, ?> localTap = (Tap<Properties, ?, ?>) source;
Properties properties = new Properties();
FlowProcess<Properties> flowProcess = new LocalFlowProcess(properties);
localTap.sourceConfInit(flowProcess, properties);
return localTap.openForRead(flowProcess);
}
代码示例来源:origin: cwensel/cascading
protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
{
if( !taps.isEmpty() )
{
for( Tap tap : taps )
{
Properties confCopy = flowProcess.copyConfig( conf );
tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
if( isSink )
tap.sinkConfInit( flowProcess, confCopy );
else
tap.sourceConfInit( flowProcess, confCopy );
}
}
}
代码示例来源:origin: cwensel/cascading
protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
{
super.sourceConfInit( process, conf );
TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
// use CombineFileInputFormat if that is enabled
handleCombineFileInputFormat( conf );
}
代码示例来源:origin: cascading/cascading-jdbc-core
@Override
public void sourceConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
{
if( username == null )
DBConfiguration.configureDB( conf, driverClassName, connectionUrl );
else
DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
super.sourceConfInit( process, conf );
}
代码示例来源:origin: com.twitter/maple
@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
// a hack for MultiInputFormat to see that there is a child format
FileInputFormat.setInputPaths( conf, getPath() );
if(quorumNames != null) {
conf.set("hbase.zookeeper.quorum", quorumNames);
}
LOG.debug("sourcing from table: {}", tableName);
TableInputFormat.setTableName(conf, tableName);
super.sourceConfInit(process, conf);
}
代码示例来源:origin: cascading/cascading-hadoop2-io
protected void sourceConfInitComplete( FlowProcess<? extends Configuration> process, Configuration conf )
{
super.sourceConfInit( process, conf );
TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
// use CombineFileInputFormat if that is enabled
handleCombineFileInputFormat( conf );
}
代码示例来源:origin: cwensel/cascading
throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );
throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
tap.sourceConfInit( flowProcess, current );
代码示例来源:origin: cwensel/cascading
tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
tap.sourceConfInit( flowProcess, accumulatedJob );
代码示例来源:origin: cascading/cascading-hadoop2-mr1
tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
tap.sourceConfInit( flowProcess, accumulatedJob );
代码示例来源:origin: dataArtisans/cascading-flink
private DataSet<Tuple> translateSource(FlowProcess flowProcess, ExecutionEnvironment env, FlowNode node, int dop) {
Tap tap = this.getSingle(node.getSourceTaps());
JobConf tapConfig = new JobConf(this.getNodeConfig(node));
tap.sourceConfInit(flowProcess, tapConfig);
tapConfig.set( "cascading.step.source", Tap.id( tap ) );
Fields outFields = tap.getSourceFields();
registerKryoTypes(outFields);
JobConf sourceConfig = new JobConf(this.getNodeConfig(node));
MultiInputFormat.addInputFormat(sourceConfig, tapConfig);
DataSet<Tuple> src = env
.createInput(new TapInputFormat(node), new TupleTypeInfo(outFields))
.name(tap.getIdentifier())
.setParallelism(dop)
.withParameters(FlinkConfigConverter.toFlinkConfig(new Configuration(sourceConfig)));
return src;
}
代码示例来源:origin: cascading/cascading-hadoop2-io
private void initialize() throws IOException
{
// prevent collisions of configuration properties set client side if now cluster side
String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
if( property == null )
{
// default behavior is to accumulate paths, so remove any set prior
conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
tap.sourceConfInit( flowProcess, conf );
}
JobConf jobConf = asJobConfInstance( conf );
inputFormat = jobConf.getInputFormat();
if( inputFormat instanceof JobConfigurable )
( (JobConfigurable) inputFormat ).configure( jobConf );
// do not test for existence, let hadoop decide how to handle the given path
// this delegates globbing to the inputformat on split generation.
splits = inputFormat.getSplits( jobConf, 1 );
if( splits.length == 0 )
complete = true;
}
代码示例来源:origin: cwensel/cascading
private void initialize() throws IOException
{
// prevent collisions of configuration properties set client side if now cluster side
String property = flowProcess.getStringProperty( "cascading.node.accumulated.source.conf." + Tap.id( tap ) );
if( property == null )
{
// default behavior is to accumulate paths, so remove any set prior
conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2
tap.sourceConfInit( flowProcess, conf );
}
JobConf jobConf = asJobConfInstance( conf );
inputFormat = jobConf.getInputFormat();
if( inputFormat instanceof JobConfigurable )
( (JobConfigurable) inputFormat ).configure( jobConf );
// do not test for existence, let hadoop decide how to handle the given path
// this delegates globbing to the inputformat on split generation.
splits = inputFormat.getSplits( jobConf, 1 );
if( splits.length == 0 )
complete = true;
}
代码示例来源:origin: cwensel/cascading
@Test
public void testCombinedHfs() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
getPlatform().copyFromLocal( inputFileUpper );
Hfs sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileLower );
Hfs sourceUpper = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileUpper );
// create a CombinedHfs instance on these files
Tap source = new MultiSourceTap<Hfs, JobConf, RecordReader>( sourceLower, sourceUpper );
FlowProcess<JobConf> process = getPlatform().getFlowProcess();
JobConf conf = process.getConfigCopy();
// set the combine flag
conf.setBoolean( HfsProps.COMBINE_INPUT_FILES, true );
conf.set( "cascading.flow.platform", "hadoop" ); // only supported on mr based platforms
// test the input format and the split
source.sourceConfInit( process, conf );
InputFormat inputFormat = conf.getInputFormat();
assertEquals( Hfs.CombinedInputFormat.class, inputFormat.getClass() );
InputSplit[] splits = inputFormat.getSplits( conf, 1 );
assertEquals( 1, splits.length );
validateLength( source.openForRead( process ), 10 );
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
@Test
public void testCombinedHfs() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
getPlatform().copyFromLocal( inputFileUpper );
Hfs sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileLower );
Hfs sourceUpper = new Hfs( new TextLine( new Fields( "offset", "line" ) ), InputData.inputFileUpper );
// create a CombinedHfs instance on these files
Tap source = new MultiSourceTap<Hfs, JobConf, RecordReader>( sourceLower, sourceUpper );
FlowProcess<JobConf> process = getPlatform().getFlowProcess();
JobConf conf = process.getConfigCopy();
// set the combine flag
conf.setBoolean( HfsProps.COMBINE_INPUT_FILES, true );
conf.set( "cascading.flow.platform", "hadoop" ); // only supported on mr based platforms
// test the input format and the split
source.sourceConfInit( process, conf );
InputFormat inputFormat = conf.getInputFormat();
assertEquals( Hfs.CombinedInputFormat.class, inputFormat.getClass() );
InputSplit[] splits = inputFormat.getSplits( conf, 1 );
assertEquals( 1, splits.length );
validateLength( source.openForRead( process ), 10 );
}
内容来源于网络,如有侵权,请联系作者删除!