本文整理了Java中org.apache.flink.yarn.highavailability.YarnPreConfiguredMasterNonHaServices
类的一些代码示例,展示了YarnPreConfiguredMasterNonHaServices
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnPreConfiguredMasterNonHaServices
类的具体详情如下:
包路径:org.apache.flink.yarn.highavailability.YarnPreConfiguredMasterNonHaServices
类名称:YarnPreConfiguredMasterNonHaServices
[英]These YarnHighAvailabilityServices are for use by the TaskManager in setups, where there is one ResourceManager that is statically configured in the Flink configuration.
Internally, these services put their recovery data into YARN's working directory, except for checkpoints, which are in the configured checkpoint directory. That way, checkpoints can be resumed with a new job/application, even if the complete YARN application is killed and cleaned up.
A typical YARN setup that uses these HA services first starts the ResourceManager inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures of the JobManager and ResourceManager, which are running as part of the Application Master.
[中]这些YarnHighAvailabilityServices供TaskManager在设置中使用,其中有一个ResourceManager是在Flink配置中静态配置的。
####处理故障类型
*用户代码和操作员故障:从检查点恢复失败的操作员。
*任务管理器失败:重新启动失败的任务管理器,并从检查点恢复其任务。
####不可恢复故障类型
*应用程序主机故障:这些故障无法恢复,因为TaskManager无法发现新的应用程序主机地址。
在内部,这些服务将其恢复数据放入Thread的工作目录,但检查点除外,检查点位于已配置的检查点目录中。这样,即使完整的纱线应用程序被终止和清理,也可以使用新的作业/应用程序恢复检查点。
使用这些HA服务的典型Thread设置首先在ApplicationMaster中启动ResourceManager,并将其RPC端点地址放入启动TaskManager的配置中。由于这种静态寻址方案,安装程序无法处理JobManager和ResourceManager的故障,它们作为应用程序主机的一部分运行。
代码示例来源:origin: apache/flink
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
enter();
try {
throw new UnsupportedOperationException("Not supported on the TaskManager side");
} finally {
exit();
}
}
代码示例来源:origin: apache/flink
switch (mode) {
case NONE:
return new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
代码示例来源:origin: apache/flink
new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION).closeAndCleanupAllData();
代码示例来源:origin: apache/flink
@Test
public void testCloseAndCleanup() throws Exception {
final Configuration flinkConfig = new Configuration();
flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost");
flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
// create the services
YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
services.closeAndCleanupAllData();
final FileSystem fileSystem = hdfsRootPath.getFileSystem();
final Path workDir = new Path(hdfsCluster.getFileSystem().getWorkingDirectory().toString());
try {
fileSystem.getFileStatus(new Path(workDir, YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR));
fail("Flink recovery data directory still exists");
}
catch (FileNotFoundException e) {
// expected, because the directory should have been cleaned up
}
assertTrue(services.isClosed());
// doing another cleanup when the services are closed should fail
try {
services.closeAndCleanupAllData();
fail("should fail with an IllegalStateException");
} catch (IllegalStateException e) {
// expected
}
}
代码示例来源:origin: apache/flink
@Override
public LeaderElectionService getWebMonitorLeaderElectionService() {
enter();
try {
throw new UnsupportedOperationException();
}
finally {
exit();
}
}
代码示例来源:origin: apache/flink
flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427);
YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
代码示例来源:origin: apache/flink
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
enter();
try {
throw new UnsupportedOperationException("Not supported on the TaskManager side");
}
finally {
exit();
}
}
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
switch (mode) {
case NONE:
return new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
代码示例来源:origin: apache/flink
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
enter();
try {
throw new UnsupportedOperationException("needs refactoring to accept default address");
}
finally {
exit();
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
switch (mode) {
case NONE:
return new YarnPreConfiguredMasterNonHaServices(
flinkConfig,
hadoopConfig,
代码示例来源:origin: apache/flink
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
enter();
try {
throw new UnsupportedOperationException("needs refactoring to accept default address");
}
finally {
exit();
}
}
代码示例来源:origin: apache/flink
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
enter();
try {
throw new UnsupportedOperationException();
}
finally {
exit();
}
}
}
代码示例来源:origin: apache/flink
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
enter();
try {
return new StandaloneLeaderRetrievalService(dispatcherRpcUrl, DEFAULT_LEADER_ID);
} finally {
exit();
}
}
代码示例来源:origin: apache/flink
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
enter();
try {
return new StandaloneLeaderRetrievalService(resourceManagerRpcUrl, DEFAULT_LEADER_ID);
}
finally {
exit();
}
}
代码示例来源:origin: apache/flink
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
enter();
try {
return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
} finally {
exit();
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
enter();
try {
throw new UnsupportedOperationException();
}
finally {
exit();
}
}
}
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
enter();
try {
throw new UnsupportedOperationException("needs refactoring to accept default address");
}
finally {
exit();
}
}
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
enter();
try {
throw new UnsupportedOperationException("Not supported on the TaskManager side");
} finally {
exit();
}
}
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
enter();
try {
throw new UnsupportedOperationException();
}
finally {
exit();
}
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
enter();
try {
throw new UnsupportedOperationException("needs refactoring to accept default address");
}
finally {
exit();
}
}
内容来源于网络,如有侵权,请联系作者删除!