本文整理了Java中org.apache.flink.runtime.execution.Environment.getJobID()
方法的一些代码示例,展示了Environment.getJobID()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Environment.getJobID()
方法的具体详情如下:
包路径:org.apache.flink.runtime.execution.Environment
类名称:Environment
方法名:getJobID
[英]Returns the ID of the job that the task belongs to.
[中]返回任务所属作业的ID。
代码示例来源:origin: apache/flink
this.jobId = env.getJobID();
代码示例来源:origin: apache/flink
@Override
public void init() throws Exception {
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@SuppressWarnings("unchecked")
BlockingQueue<StreamRecord<IN>> dataChannel =
(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
this.headOperator = new RecordPusher<>();
this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
// call super.init() last because that needs this.headOperator to be set up
super.init();
}
代码示例来源:origin: apache/flink
final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
代码示例来源:origin: apache/flink
() -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
代码示例来源:origin: apache/flink
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.11
this.jobId = env.getJobID();
代码示例来源:origin: org.apache.flink/flink-statebackend-rocksdb_2.10
this.jobId = env.getJobID();
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
return stateBackend.createSavepointStreamFactory(
getEnvironment().getJobID(),
createOperatorIdentifier(operator, configuration.getVertexID()),
targetLocation);
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
/**
* This is only visible because
* {@link org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink} uses the
* checkpoint stream factory to write write-ahead logs. <b>This should not be used for
* anything else.</b>
*/
public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator) throws IOException {
return stateBackend.createStreamFactory(
getEnvironment().getJobID(),
createOperatorIdentifier(operator, configuration.getVertexID()));
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
public String brokerKey() {
if (brokerKey == null) {
int iterationId = config.getIterationId();
brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}
return brokerKey;
}
代码示例来源:origin: org.apache.flink/flink-runtime
public String brokerKey() {
if (brokerKey == null) {
int iterationId = config.getIterationId();
brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}
return brokerKey;
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.11
public String brokerKey() {
if (brokerKey == null) {
int iterationId = config.getIterationId();
brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}
return brokerKey;
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
public String brokerKey() {
if (brokerKey == null) {
int iterationId = config.getIterationId();
brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}
return brokerKey;
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
@Override
public void init() throws Exception {
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@SuppressWarnings("unchecked")
BlockingQueue<StreamRecord<IN>> dataChannel =
(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
this.headOperator = new RecordPusher<>();
this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
// call super.init() last because that needs this.headOperator to be set up
super.init();
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
@Override
public void init() throws Exception {
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@SuppressWarnings("unchecked")
BlockingQueue<StreamRecord<IN>> dataChannel =
(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
this.headOperator = new RecordPusher<>();
this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
// call super.init() last because that needs this.headOperator to be set up
super.init();
}
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
@Override
public void init() throws Exception {
final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@SuppressWarnings("unchecked")
BlockingQueue<StreamRecord<IN>> dataChannel =
(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
this.headOperator = new RecordPusher<>();
this.headOperator.setup(this, getConfiguration(), new IterationTailOutput<>(dataChannel, iterationWaitTime));
// call super.init() last because that needs this.headOperator to be set up
super.init();
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.10
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange) throws Exception {
if (keyedStateBackend != null) {
throw new RuntimeException("The keyed state backend can only be created once.");
}
String operatorIdentifier = createOperatorIdentifier(
headOperator,
configuration.getVertexID());
keyedStateBackend = stateBackend.createKeyedStateBackend(
getEnvironment(),
getEnvironment().getJobID(),
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
getEnvironment().getTaskKvStateRegistry());
// let keyed state backend participate in the operator lifecycle, i.e. make it responsive to cancelation
cancelables.registerClosable(keyedStateBackend);
// restore if we have some old state
Collection<KeyedStateHandle> restoreKeyedStateHandles =
restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState();
keyedStateBackend.restore(restoreKeyedStateHandles);
@SuppressWarnings("unchecked")
AbstractKeyedStateBackend<K> typedBackend = (AbstractKeyedStateBackend<K>) keyedStateBackend;
return typedBackend;
}
代码示例来源:origin: org.apache.flink/flink-streaming-java
() -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
代码示例来源:origin: org.apache.flink/flink-streaming-java_2.11
() -> stateBackend.createKeyedStateBackend(
environment,
environment.getJobID(),
operatorIdentifierText,
keySerializer,
内容来源于网络,如有侵权,请联系作者删除!