本文整理了Java中cascading.tap.Tap.sinkConfInit()
方法的一些代码示例,展示了Tap.sinkConfInit()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Tap.sinkConfInit()
方法的具体详情如下:
包路径:cascading.tap.Tap
类名称:Tap
方法名:sinkConfInit
[英]Method sinkConfInit initializes this instance as a sink.
This method maybe called more than once if this Tap instance is used outside the scope of a cascading.flow.Flowinstance or if it participates in multiple times in a given Flow or across different Flows in a cascading.cascade.Cascade.
Note this method will be called in context of this Tap being used as a traditional 'sink' and as a 'trap'.
In the context of a Flow, it will be called after cascading.flow.FlowListener#onStarting(cascading.flow.Flow)
Note that no resources or services should be modified by this method. If this Tap instance returns true for #isReplace(), then #deleteResource(Object) will be called by the parent Flow.
[中]方法sinkConfInit将此实例初始化为接收器。
如果此Tap实例在级联范围之外使用,则可能会多次调用此方法。流Flowinstance,或者它多次参与给定流或级联中的不同流。大量大量
注:此方法将在该抽头用作传统“接收器”和“陷阱”的情况下调用。
在流的上下文中,它将在级联后被调用。流FlowListener#启动(级联.流.流)
请注意,此方法不应修改任何资源或服务。如果此Tap实例为#isReplace()返回true,则父流将调用#deleteResource(对象)。
代码示例来源:origin: elastic/elasticsearch-hadoop
@Override
public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
actualTap.sinkConfInit(flowProcess, conf);
}
代码示例来源:origin: cwensel/cascading
@Override
public void sinkConfInit( FlowProcess<? extends Config> flowProcess, Config conf )
{
original.sinkConfInit( flowProcess, conf );
}
代码示例来源:origin: cwensel/cascading
@Override
public void sinkConfInit( FlowProcess<? extends TConfig> flowProcess, TConfig conf )
{
original.sinkConfInit( processProvider.apply( flowProcess ), configProvider.apply( conf ) );
}
代码示例来源:origin: cwensel/cascading
private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
{
if( !traps.isEmpty() )
{
JobConf trapConf = HadoopUtil.copyJobConf( conf );
for( Tap tap : traps.values() )
tap.sinkConfInit( flowProcess, trapConf );
}
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
{
if( !traps.isEmpty() )
{
JobConf trapConf = HadoopUtil.copyJobConf( conf );
for( Tap tap : traps.values() )
tap.sinkConfInit( flowProcess, trapConf );
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch-hadoop
@Override
public void sinkConfInit(FlowProcess<Object> flowProcess, Object conf) {
initInnerTapIfNotSetFromFlowProcess(flowProcess);
actualTap.sinkConfInit(flowProcess, conf);
}
代码示例来源:origin: cwensel/cascading
protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
{
Map<String, Tap> traps = flowNode.getTrapMap();
if( !traps.isEmpty() )
{
JobConf trapConf = new JobConf( conf );
for( Tap tap : traps.values() )
{
tap.sinkConfInit( flowProcess, trapConf );
setLocalMode( conf, trapConf, tap );
}
}
}
代码示例来源:origin: cascading/cascading-hadoop2-tez
protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
{
Map<String, Tap> traps = flowNode.getTrapMap();
if( !traps.isEmpty() )
{
JobConf trapConf = new JobConf( conf );
for( Tap tap : traps.values() )
{
tap.sinkConfInit( flowProcess, trapConf );
setLocalMode( conf, trapConf, tap );
}
}
}
代码示例来源:origin: cwensel/cascading
protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
{
if( !taps.isEmpty() )
{
for( Tap tap : taps )
{
Properties confCopy = flowProcess.copyConfig( conf );
tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
if( isSink )
tap.sinkConfInit( flowProcess, confCopy );
else
tap.sourceConfInit( flowProcess, confCopy );
}
}
}
代码示例来源:origin: com.twitter/maple
private void initialize() throws IOException {
tap.sinkConfInit(hadoopFlowProcess, conf);
OutputFormat outputFormat = conf.getOutputFormat();
LOG.info("Output format class is: " + outputFormat.getClass().toString());
writer = outputFormat.getRecordWriter(null, conf, tap.getIdentifier(), Reporter.NULL);
sinkCall.setOutput(this);
}
代码示例来源:origin: cwensel/cascading
private void bridge( FlowProcess flowProcess, Object conf )
{
childConfigs = new ArrayList<>();
for( int i = 0; i < getTaps().length; i++ )
{
Tap tap = getTaps()[ i ];
Object newConfig = flowProcess.copyConfig( conf );
tap.sinkConfInit( flowProcess, newConfig );
childConfigs.add( flowProcess.diffConfigIntoMap( conf, newConfig ) );
}
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
{
// init sink first so tempSink can take precedence
if( getSink() != null )
getSink().sinkConfInit( flowProcess, conf );
Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
boolean isFileOutputFormat = false;
if( outputFormat != null )
isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
Path outputPath = FileOutputFormat.getOutputPath( conf );
// if no output path is set, we need to substitute an alternative if the OutputFormat is file based
// PartitionTap won't set the output, but will set an OutputFormat
// MultiSinkTap won't set the output or set the OutputFormat
// Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
// tempSink exists because sink is writeDirect
if( tempSink != null )
tempSink.sinkConfInit( flowProcess, conf );
}
代码示例来源:origin: cwensel/cascading
protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
{
// init sink first so tempSink can take precedence
if( getSink() != null )
getSink().sinkConfInit( flowProcess, conf );
Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
boolean isFileOutputFormat = false;
if( outputFormat != null )
isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
Path outputPath = FileOutputFormat.getOutputPath( conf );
// if no output path is set, we need to substitute an alternative if the OutputFormat is file based
// PartitionTap won't set the output, but will set an OutputFormat
// MultiSinkTap won't set the output or set the OutputFormat
// Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
// tempSink exists because sink is writeDirect
if( tempSink != null )
tempSink.sinkConfInit( flowProcess, conf );
}
代码示例来源:origin: cwensel/cascading
@Override
public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
{
Path qualifiedPath = new Path( getFullIdentifier( conf ) );
HadoopUtil.setOutputPath( conf, qualifiedPath );
super.sinkConfInit( process, conf );
makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
}
代码示例来源:origin: cascading/cascading-hadoop2-io
@Override
public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
{
Path qualifiedPath = new Path( getFullIdentifier( conf ) );
HadoopUtil.setOutputPath( conf, qualifiedPath );
super.sinkConfInit( process, conf );
makeLocal( conf, qualifiedPath, "forcing job to stand-alone mode, via sink: " );
TupleSerialization.setSerializations( conf ); // allows Hfs to be used independent of Flow
}
代码示例来源:origin: cascading/cascading-jdbc-core
@Override
public void sinkConfInit( FlowProcess<? extends Configuration> process, Configuration conf )
{
if( !isSink() )
return;
if( username == null )
DBConfiguration.configureDB( conf, driverClassName, connectionUrl );
else
DBConfiguration.configureDB( conf, driverClassName, connectionUrl, username, password );
super.sinkConfInit( process, conf );
}
代码示例来源:origin: cwensel/cascading
protected void initialize() throws IOException
{
tap.sinkConfInit( flowProcess, conf );
OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
// todo: use OutputCommitter class
isFileOutputFormat = outputFormat instanceof FileOutputFormat;
if( isFileOutputFormat )
{
Hadoop18TapUtil.setupJob( conf );
Hadoop18TapUtil.setupTask( conf );
int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
long localSequence = sequence == -1 ? 0 : sequence;
if( prefix != null )
filename = String.format( filenamePattern, prefix, "/", partition, localSequence );
else
filename = String.format( filenamePattern, "", "", partition, localSequence );
}
LOG.info( "creating path: {}", filename );
writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );
}
代码示例来源:origin: cascading/cascading-hadoop2-io
protected void initialize() throws IOException
{
tap.sinkConfInit( flowProcess, conf );
OutputFormat outputFormat = asJobConfInstance( conf ).getOutputFormat();
// todo: use OutputCommitter class
isFileOutputFormat = outputFormat instanceof FileOutputFormat;
if( isFileOutputFormat )
{
Hadoop18TapUtil.setupJob( conf );
Hadoop18TapUtil.setupTask( conf );
int partition = conf.getInt( "mapred.task.partition", conf.getInt( "mapreduce.task.partition", 0 ) );
long localSequence = sequence == -1 ? 0 : sequence;
if( prefix != null )
filename = String.format( filenamePattern, prefix, "/", partition, localSequence );
else
filename = String.format( filenamePattern, "", "", partition, localSequence );
}
LOG.info( "creating path: {}", filename );
writer = outputFormat.getRecordWriter( null, asJobConfInstance( conf ), filename, getReporter() );
}
代码示例来源:origin: com.hotels/plunger
private void writeToHadoopTap(Tap<?, ?, ?> tap) throws IOException {
@SuppressWarnings("unchecked")
Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) tap;
JobConf conf = new JobConf();
HadoopFlowProcess flowProcess = new HadoopFlowProcess(conf);
hadoopTap.sinkConfInit(flowProcess, conf);
TupleEntryCollector collector = hadoopTap.openForWrite(flowProcess);
for (TupleEntry tuple : data.asTupleEntryList()) {
collector.add(tuple);
}
collector.close();
}
代码示例来源:origin: com.hotels/plunger
private void writeToLocalTap(Tap<?, ?, ?> tap) throws IOException {
@SuppressWarnings("unchecked")
Tap<Properties, ?, ?> localTap = (Tap<Properties, ?, ?>) tap;
Properties conf = new Properties();
LocalFlowProcess flowProcess = new LocalFlowProcess(conf);
flowProcess.setStepStats(new LocalStepStats(new NullFlowStep(), NullClientState.INSTANCE));
localTap.sinkConfInit(flowProcess, conf);
TupleEntryCollector collector = localTap.openForWrite(flowProcess);
for (TupleEntry tuple : data.asTupleEntryList()) {
collector.add(tuple);
}
collector.close();
localTap.commitResource(conf);
}
内容来源于网络,如有侵权,请联系作者删除!