本文整理了Java中org.apache.flink.yarn.YarnClusterDescriptor
类的一些代码示例,展示了YarnClusterDescriptor
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。YarnClusterDescriptor
类的具体详情如下:
包路径:org.apache.flink.yarn.YarnClusterDescriptor
类名称:YarnClusterDescriptor
[英]Implementation of AbstractYarnClusterDescriptor which is used to start the application master.
[中]AbstractYanClusterDescriptor的实现,用于启动应用程序主机。
代码示例来源:origin: apache/flink
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1)
.setTaskManagerMemoryMB(1)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(Integer.MAX_VALUE)
.createClusterSpecification();
try {
clusterDescriptor.deploySessionCluster(clusterSpecification);
fail("The deploy call should have failed.");
} catch (ClusterDeploymentException e) {
// we expect the cause to be an IllegalConfigurationException
if (!(e.getCause() instanceof IllegalConfigurationException)) {
throw e;
}
} finally {
clusterDescriptor.close();
}
}
代码示例来源:origin: apache/flink
@Nonnull
YarnClusterDescriptor createYarnClusterDescriptor(org.apache.flink.configuration.Configuration flinkConfiguration) {
final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
YARN_CONFIGURATION,
CliFrontend.getConfigurationDirectoryFromEnv(),
yarnClient,
true);
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.toURI()));
yarnClusterDescriptor.addShipFiles(Collections.singletonList(flinkLibFolder));
return yarnClusterDescriptor;
}
代码示例来源:origin: apache/flink
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
代码示例来源:origin: apache/flink
try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
yarnClient,
true)) {
descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
shipFiles.add(libFolder);
descriptor.addShipFiles(shipFiles);
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
代码示例来源:origin: apache/flink
final YarnClient yarnClient = getYarnClient();
try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration,
getYarnConfiguration(),
true)) {
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
yarnClusterDescriptor.killCluster(applicationId);
代码示例来源:origin: apache/flink
@Test
public void testSetupApplicationMasterContainer() {
Configuration cfg = new Configuration();
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
cfg,
yarnConfiguration,
final String log4j =
"-Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; // if set
final String mainClass = clusterDescriptor.getYarnSessionClusterEntrypoint();
final String args = "";
final String redirects =
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
.setupApplicationMasterContainer(
mainClass,
false,
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
.setupApplicationMasterContainer(
mainClass,
false,
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
.setupApplicationMasterContainer(
mainClass,
true,
代码示例来源:origin: apache/flink
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: apache/flink
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
true);
yarnClusterDescriptor.close();
closableYarnClient.start();
yarnClusterDescriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
false);
yarnClusterDescriptor.close();
代码示例来源:origin: apache/flink
try (YarnClusterDescriptor descriptor = new YarnClusterDescriptor(
new Configuration(),
yarnConfiguration,
CommonTestUtils.setEnv(env);
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
} finally {
CommonTestUtils.setEnv(oldEnv);
代码示例来源:origin: apache/flink
/**
* Tests that Yarn will restart a killed {@link YarnSessionClusterEntrypoint} which will then resume
* a persisted {@link JobGraph}.
*/
@Test
public void testKillYarnSessionClusterEntrypoint() throws Exception {
assumeTrue(
"This test kills processes via the pkill command. Thus, it only runs on Linux, Mac OS, Free BSD and Solaris.",
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
final ApplicationId id = restClusterClient.getClusterId();
waitUntilJobIsRunning(restClusterClient, jobId);
killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
waitForApplicationAttempt(id, 2);
waitForJobTermination(restClusterClient, jobId);
killApplicationAndWait(id);
} finally {
restClusterClient.shutdown();
}
}
代码示例来源:origin: apache/flink
private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException {
final int containerMemory = 256;
final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor.deploySessionCluster(
new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(containerMemory)
.setTaskManagerMemoryMB(containerMemory)
.setSlotsPerTaskManager(1)
.createClusterSpecification());
assertThat(yarnClusterClient, is(instanceOf(RestClusterClient.class)));
return (RestClusterClient<ApplicationId>) yarnClusterClient;
}
代码示例来源:origin: apache/flink
@Test
public void testJobRecoversAfterKillingTaskManager() throws Exception {
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
waitUntilJobIsRunning(restClusterClient, jobId);
stopTaskManagerContainer();
waitUntilJobIsRestarted(restClusterClient, jobId, 1);
waitForJobTermination(restClusterClient, jobId);
killApplicationAndWait(restClusterClient.getClusterId());
} finally {
restClusterClient.shutdown();
}
}
代码示例来源:origin: apache/flink
final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
yarnConfiguration,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
.createClusterSpecification();
final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
clusterDescriptor.killCluster(clusterId);
clusterDescriptor.close();
代码示例来源:origin: DTStack/flinkStreamSQL
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: apache/flink
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
yarnConfiguration,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
clusterDescriptor.deploySessionCluster(clusterSpecification);
clusterDescriptor.close();
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: org.apache.flink/flink-yarn_2.11
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
}
代码示例来源:origin: org.apache.flink/flink-yarn
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
代码示例来源:origin: DTStack/flinkStreamSQL
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, ".", yarnClient, false);
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
clusterClient.setDetached(true);
内容来源于网络,如有侵权,请联系作者删除!