【Flink】Flink跨集群访问开启Kerberos认证的Kafka

x33g5p2x  于2022-08-17 转载在 Flink  
字(17.8k)|赞(0)|评价(0)|浏览(1615)

1.概述

转载:Flink跨集群访问开启Kerberos认证的Kafka

Flink提供了三个模块来对集群进行安全验证,分别是HadoopModule、JaasModule、ZooKeeperModule。安全认证相关参数对应的类SecurityOptions。

HadoopModule用来对使用UserGroupInformation进行身份验证的框架(kudu、hbase同步框架、hdfs等)进行认证配置。
JaasModule用来对使用JaasConfig进行身份验证的框架(kafka、zk、hbase异步框架等)进行认证配置。
ZooKeeperModule负责安装整个进程的ZooKeeper安全配置。

Flink组件在启动时,会先加载认证相关模块,在构建的安全上下文中,启动集群各个组件。不过Flink整个集群只能使用一份证书进行相关验证,也就是说,如果Flink任务从开启Kerberos认证的Kafka中读取数据,并写入Kudu,则使用的principal和keytab,具有同时访问hdfs、kafka、kudu的权限。如果使用不同的证书,则需要在Flink任务中单独进行Kerberos相关配置。

2.启动流程

以JM启动为例,查看加载安全认证相关组件,记录各个模块调用链。

加载安全认证上下文,从上下文中启动集群各个组件。

  1. ClusterEntrypoint#startCluster
  2. //
  3. SecurityContext securityContext = installSecurityContext(configuration);
  4. securityContext.runSecured((Callable<Void>) () -> {
  5. runCluster(configuration);
  6. return null;
  7. });
  8. SecurityContext installSecurityContext(Configuration configuration)) {
  9. SecurityUtils.install(new SecurityConfiguration(configuration));
  10. return SecurityUtils.getInstalledContext();
  11. }

SecurityConfiguration默认提供了两个security.context.factory.classes用来构建SecurityContext:
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory:根据构建的UserGroupInformation在doAs方法中启动集群。
org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory:默认在不需要进行安全认证的上下文中启动。
提供三个security.module.factory.classes用来准备安全认证使用的配置:

  1. org.apache.flink.runtime.security.modules.HadoopModuleFactory
  2. org.apache.flink.runtime.security.modules.JaasModuleFactory
  3. org.apache.flink.runtime.security.modules.ZookeeperModuleFactory

主要还是installModules配置认证使用的配置

  1. SecurityUtils#install
  2. public static void install(SecurityConfiguration config) throws Exception {
  3. // Install the security modules first before installing the security context
  4. installModules(config);
  5. installContext(config);
  6. }
  7. #installModules使用SPI动态创建moduleFactory,分别调用其install方法
  8. static void installModules(SecurityConfiguration config) {
  9. List<SecurityModule> modules = new ArrayList<>();
  10. for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
  11. SecurityModuleFactory moduleFactory = null;
  12. SecurityModule module = moduleFactory.createModule(config);
  13. if (module != null) {
  14. module.install();
  15. modules.add(module);
  16. }
  17. }
  18. installedModules = modules;
  19. }
  20. }

HadoopModule#install。hdfs进行kerberos认证需要UserGroupInformation作为loginUser,该模块用来构建全局的loginUser,如果其他组件能够使用该loginUser进行认证,则不需要单独配置证书。

  1. public void install() throws SecurityInstallException {
  2. ## 传递hadoop相关参数
  3. UserGroupInformation.setConfiguration(hadoopConfiguration);
  4. UserGroupInformation loginUser;
  5. try {
  6. ## 开启kerberos认证并传递Keytab和Principal
  7. if (UserGroupInformation.isSecurityEnabled() &&
  8. !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
  9. String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
  10. UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
  11. ## 当前登陆用户
  12. loginUser = UserGroupInformation.getLoginUser();
  13. // token cache
  14. // supplement with any available tokens
  15. String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
  16. if (fileLocation != null) {
  17. // Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
  18. // used in the context of reading the stored tokens from UGI.
  19. // Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
  20. // loginUser.addCredentials(cred);
  21. try {
  22. Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
  23. File.class, org.apache.hadoop.conf.Configuration.class);
  24. Credentials cred =
  25. (Credentials) readTokenStorageFileMethod.invoke(
  26. null,
  27. new File(fileLocation),
  28. hadoopConfiguration);
  29. // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
  30. // the UGI would prefer the delegation token instead, which eventually expires
  31. // and does not fallback to using Kerberos tickets
  32. Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
  33. Credentials credentials = new Credentials();
  34. final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
  35. Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
  36. //If UGI use keytab for login, do not load HDFS delegation token.
  37. for (Token<? extends TokenIdentifier> token : usrTok) {
  38. if (!token.getKind().equals(hdfsDelegationTokenKind)) {
  39. final Text id = new Text(token.getIdentifier());
  40. credentials.addToken(id, token);
  41. }
  42. }
  43. Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
  44. Credentials.class);
  45. addCredentialsMethod.invoke(loginUser, credentials);
  46. } catch (NoSuchMethodException e) {
  47. LOG.warn("Could not find method implementations in the shaded jar.", e);
  48. } catch (InvocationTargetException e) {
  49. throw e.getTargetException();
  50. }
  51. }
  52. } else {
  53. // login with current user credentials (e.g. ticket cache, OS login)
  54. // note that the stored tokens are read automatically
  55. try {
  56. //Use reflection API to get the login user object
  57. //UserGroupInformation.loginUserFromSubject(null);
  58. Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
  59. loginUserFromSubjectMethod.invoke(null, (Subject) null);
  60. } catch (NoSuchMethodException e) {
  61. LOG.warn("Could not find method implementations in the shaded jar.", e);
  62. } catch (InvocationTargetException e) {
  63. throw e.getTargetException();
  64. }
  65. loginUser = UserGroupInformation.getLoginUser();
  66. }
  67. boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
  68. loginUser, securityConfig.useTicketCache());
  69. LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);
  70. } catch (Throwable ex) {
  71. throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
  72. }
  73. }

JaasModule#install。准备jaas文件使用的各个属性,先传递给ConfigFile,当各个框架使用jaas文件进行验证时,从javax.security.auth.login.Configuration中提取。

Kafka kerberos认证使用的jaas文件:

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. keyTab="/Users/xx/kafka.keytab"
  4. principal="kafka/cdh002@TEST.COM"
  5. useKeyTab=true
  6. useTicketCache=true;
  7. };

jaas文件使用的参数,写入javax.security.auth.login.Configuration。各个框架使用时会从改配置中取。

  1. public void install() {
  2. priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
  3. ## 创建一个空的jaas文件,和环境变量java.security.auth.login.config绑定
  4. if (priorConfigFile == null) {
  5. File configFile = generateDefaultConfigFile(workingDir);
  6. System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
  7. LOG.info("Jaas file will be created as {}.", configFile);
  8. }
  9. // read the JAAS configuration file, 创建ConfigFile
  10. priorConfig = javax.security.auth.login.Configuration.getConfiguration();
  11. // construct a dynamic JAAS configuration
  12. currentConfig = new DynamicConfiguration(priorConfig);
  13. // wire up the configured JAAS login contexts to use the krb5 entries
  14. AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
  15. if (krb5Entries != null) {
  16. for (String app : securityConfig.getLoginContextNames()) {
  17. currentConfig.addAppConfigurationEntry(app, krb5Entries); // kafkaClient
  18. }
  19. }
  20. ## 写入javax.security.auth.login.Configuration
  21. javax.security.auth.login.Configuration.setConfiguration(currentConfig);
  22. }

ZooKeeperModule#install

  1. public void install() throws SecurityInstallException {
  2. priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
  3. System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
  4. priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
  5. if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
  6. System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
  7. }
  8. priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
  9. if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
  10. System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
  11. }
  12. }

加载完各个Module后,构建installContext。只会有一个installedContext。

  1. static void installContext(SecurityConfiguration config) throws Exception {
  2. // install the security context factory
  3. for (String contextFactoryClass : config.getSecurityContextFactories()) {
  4. try {
  5. // spi加载
  6. SecurityContextFactory contextFactory = SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
  7. // 有hadoop环境就走Hadoop 认证
  8. if (contextFactory.isCompatibleWith(config)) {
  9. try {
  10. installedContext = contextFactory.createContext(config);
  11. // install the first context that's compatible and ignore the remaining. 只加载一个
  12. break;
  13. } catch (SecurityContextInitializeException e) {
  14. LOG.error("Cannot instantiate security context with: " + contextFactoryClass, e);
  15. }
  16. } else {
  17. LOG.warn("Unable to install incompatible security context factory {}", contextFactoryClass);
  18. }
  19. } catch (NoMatchSecurityFactoryException ne) {
  20. LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
  21. }
  22. }
  23. if (installedContext == null) {
  24. LOG.error("Unable to install a valid security context factory!");
  25. throw new Exception("Unable to install a valid security context factory!");
  26. }
  27. }

Flink读取开启Kerberos认证的kafka时,需要进行如下配置。并未传递java.security.auth.login.config以及sasl.jaas.config配置。

  1. kafka中添加的参数:
  2. 1. security.protocol='SASL_PLAINTEXT' //使用SASL认证协议
  3. 2. sasl.mechanism = 'GSSAPI' // 使用kerberos认证
  4. 3. sasl.kerberos.service.name = 'kafka' // 服务名称
  5. flinkconf.yaml中参数:
  6. 1. security.kerberos.login.use-ticket-cache:true
  7. 2. security.kerberos.login.keytab: xxxx
  8. 3. security.kerberos.login.principal:xxxxx
  9. 4. security.kerberos.login.contexts: KafkaClient,Client

如果配置sasl.jaas.config,则格式为:

  1. String config = "com.sun.security.auth.module.Krb5LoginModule required\n" +
  2. "\tprincipal=\"kafka/xxxxx@EXAMPLE.COM\"\n" +
  3. "\tkeyTab=\"/Users/xxxx/kafka.keytab\"\n" +
  4. "\tuseKeyTab=true\n" +
  5. "\tuseTicketCache=true;";

Flink kafka connector 使用的jaas配置流程。

  1. FlinkKafkaConsumerBase#open
  2. |
  3. this.partitionDiscoverer.open();
  4. |
  5. KafkaPartitionDiscoverer#initializeConnections
  6. |
  7. KafkaConsumer#KafkaConsumer
  8. |
  9. ClientUtils.createChannelBuilder(config);
  10. |
  11. ChannelBuilders#clientChannelBuilder
  12. |
  13. ChannelBuilders#create
  14. |
  15. SaslChannelBuilder0.10版本)#configure
  16. |
  17. JaasUtils#jaasConfig
  1. public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) {
  2. Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
  3. # 绑定了sasl.jaas.config参数,则从val中获取
  4. if (jaasConfigArgs != null) {
  5. if (loginType == LoginType.SERVER)
  6. throw new IllegalArgumentException("JAAS config property not supported for server");
  7. else {
  8. JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
  9. AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
  10. int numModules = clientModules == null ? 0 : clientModules.length;
  11. if (numModules != 1)
  12. throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
  13. return jaasConfig;
  14. }
  15. } else
  16. return defaultJaasConfig(loginType);
  17. }
  18. private static Configuration defaultJaasConfig(LoginType loginType) {
  19. # 从Flink jaasModule加载时,已经绑定java.security.auth.login.config
  20. String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
  21. if (jaasConfigFile == null) {
  22. LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
  23. SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
  24. }
  25. # 拿到javax.security.auth.login.Configuration中的配置,jaas中使用的认证实体
  26. Configuration jaasConfig = Configuration.getConfiguration();
  27. # KafkaClient
  28. String loginContextName = loginType.contextName();
  29. AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
  30. if (configEntries == null) {
  31. String errorMessage;
  32. errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
  33. JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
  34. throw new IllegalArgumentException(errorMessage);
  35. }
  36. return jaasConfig;
  37. }

4.跨集群访问开启Kerberos认证的Kafka

目的,Flink任务运行在开启Kerberos认证的A集群,同时读取A集群以及B集群的Kafka信息。

步骤:

上传新集群的Krb5.conf文件,追加到原有的Krb5文件中。
单独配置sasl.jaas.config参数。

特别注意:

新上传的Krb5.conf文件中,domain_realm一定要做realms的映射,否则会使用default_realm。

  1. [realms]
  2. PLS.COM = {
  3. kdc = plscdh00:88
  4. admin_server = plscdh00
  5. }
  6. plscdh0-3 映射到PLS.COM
  7. [domain_realm]
  8. .pls.com = PLS.COM
  9. pls.com = PLS.COM
  10. plscdh01 = PLS.COM
  11. plscdh02 = PLS.COM
  12. plscdh03 = PLS.COM
  13. plscdh00 = PLS.COM

将新上传的Krb5.conf文件中的domain_realm以及realms追加到现有的Krb5文件中。
通过使用sasl.jaas.config来传递Jaas文件内容,如果使用AppConfigurationEntry类传递的话,kafka默认LoginContextName为KafkaClient,多个kafka集群下取出的AppConfigurationEntry会混乱。
相关代码:

krb5文件构建代码:

  1. public class KrbConfManager {
  2. Logger LOG = LoggerFactory.getLogger(KrbConfManager.class);
  3. private static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
  4. private KrbConfManager(){}
  5. public void appendKrbConf(String krbConf) {
  6. this.appendKrbConf(krbConf, System.getProperty("user.dir"));
  7. }
  8. /**
  9. * 将新增的kbr5.conf文件,使用Config类方法进行解析后,追加到现有的kbr5.conf文件中
  10. * @param krbConf
  11. */
  12. public void appendKrbConf(String krbConf, String directory) {
  13. String krbConfPath = DtFileUtils.getFileAbsolutePath(krbConf);
  14. LOG.info("krb conf abs path is {}", krbConfPath);
  15. Preconditions.checkArgument(DtFileUtils.fileExistCheck(krbConfPath),"krb file does not exist");
  16. try {
  17. Constructor<Config> constructor = Config.class.getDeclaredConstructor();
  18. constructor.setAccessible(true);
  19. Config configParser = constructor.newInstance();
  20. Method loadConfigFile = configParser.getClass().getDeclaredMethod("loadConfigFile", String.class);
  21. loadConfigFile.setAccessible(true);
  22. List<String> configFileList = (List<String>) loadConfigFile.invoke(configParser, krbConfPath);
  23. Method parseStanzaTable = configParser.getClass().getDeclaredMethod("parseStanzaTable", List.class);
  24. parseStanzaTable.setAccessible(true);
  25. Hashtable<String, Object> appendConfig = (Hashtable<String, Object>) parseStanzaTable.invoke(configParser, configFileList);
  26. Hashtable<String, Object> appendRealms = (Hashtable<String, Object>) appendConfig.get("realms");
  27. Hashtable<String, Object> appendDomainRealm = (Hashtable<String, Object>) appendConfig.get("domain_realm");
  28. Config instance = Config.getInstance();
  29. Field stanzaTable = instance.getClass().getDeclaredField("stanzaTable");
  30. stanzaTable.setAccessible(true);
  31. Hashtable<String, Object> currentTable = (Hashtable<String, Object>) stanzaTable.get(instance);
  32. Hashtable<String, Object> realms = (Hashtable<String, Object>) currentTable.computeIfAbsent("realms", key -> new Hashtable());
  33. realms.putAll(appendRealms);
  34. Hashtable<String, Object> domainRealm = (Hashtable<String, Object>) currentTable.computeIfAbsent("domain_realm", key -> new Hashtable());
  35. domainRealm.putAll(appendDomainRealm);
  36. StringBuffer stringBuffer = new StringBuffer();
  37. String newKerbConfigStr = buildKrbConfigStr(currentTable, stringBuffer);
  38. LOG.info("====buildKerbConf======\n{}", newKerbConfigStr);
  39. String krb5FilePath = DtFileUtils.createTempFile("krb-", ".conf", newKerbConfigStr, directory);
  40. System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5FilePath);
  41. Config.refresh();
  42. } catch (Exception e) {
  43. throw new RuntimeException("build krb conf error", e);
  44. }
  45. }
  46. private String buildKrbConfigStr(Hashtable<String, Object> currentTable, StringBuffer stringBuilder) {
  47. Set<String> keySet = currentTable.keySet();
  48. for (String key : keySet) {
  49. stringBuilder = stringBuilder.append("[").append(key).append("]").append("\n");
  50. if (!StringUtils.equalsIgnoreCase(key, "realms")) {
  51. toStringInternal("", currentTable.get(key), stringBuilder);
  52. } else {
  53. dealRealms(currentTable.get(key), stringBuilder);
  54. }
  55. }
  56. return stringBuilder.toString();
  57. }
  58. private void dealRealms(Object realms, StringBuffer stringBuilder) {
  59. if (realms instanceof Hashtable) {
  60. Hashtable realmsTable = (Hashtable) realms;
  61. Iterator tabIterator = realmsTable.keySet().iterator();
  62. while (tabIterator.hasNext()) {
  63. Object entity = tabIterator.next();
  64. stringBuilder = stringBuilder.append(entity).append(" = ").append("{\n");
  65. toStringInternal("", realmsTable.get(entity), stringBuilder);
  66. stringBuilder.append("}\n");
  67. }
  68. }
  69. }
  70. private static void toStringInternal(String prefix, Object obj, StringBuffer sb) {
  71. if (obj instanceof String) {
  72. // A string value, just print it
  73. sb.append(obj).append('\n');
  74. } else if (obj instanceof Hashtable) {
  75. // A table, start a new sub-section...
  76. Hashtable<?, ?> tab = (Hashtable<?, ?>) obj;
  77. for (Object o : tab.keySet()) {
  78. // ...indent, print "key = ", and
  79. sb.append(prefix).append(" ").append(o).append(" = ");
  80. // ...go recursively into value
  81. toStringInternal(prefix + " ", tab.get(o), sb);
  82. }
  83. sb.append(prefix).append("\n");
  84. } else if (obj instanceof Vector) {
  85. // A vector of strings, print them inside [ and ]
  86. Vector<?> v = (Vector<?>) obj;
  87. boolean first = true;
  88. for (Object o : v.toArray()) {
  89. if (!first) {
  90. sb.append(",");
  91. }
  92. sb.append(o);
  93. first = false;
  94. }
  95. sb.append("\n");
  96. }
  97. }
  98. private enum Singleton {
  99. INSTANCE;
  100. private final KrbConfManager instance;
  101. Singleton() {
  102. instance = new KrbConfManager();
  103. }
  104. private KrbConfManager getInstance() {
  105. return instance;
  106. }
  107. }
  108. public static KrbConfManager getInstance() {
  109. return Singleton.INSTANCE.getInstance();
  110. }
  111. }

sasl.jaas.config 文件构建以及填充

  1. public interface SecurityManager {
  2. Logger LOG = LoggerFactory.getLogger(SecurityManager.class);
  3. String SASL_JAAS_CONFIG = "sasl.jaas.config";
  4. default void kerberosSecurity(KafkaSourceTableInfo kafkaSourceTableInfo, Properties props) {
  5. if (StringUtils.equalsIgnoreCase(kafkaSourceTableInfo.getKerberosAuthEnable(), Boolean.TRUE.toString())) {
  6. Optional.ofNullable(kafkaSourceTableInfo.getKrbConfName())
  7. .ifPresent(KrbConfManager.getInstance()::appendKrbConf);
  8. String jaasContent = JaasConfigUtil.JaasConfig.builder()
  9. .setLoginModule("com.sun.security.auth.module.Krb5LoginModule")
  10. .setLoginModuleFlag("required")
  11. .setPrincipal(DtStringUtil.addQuoteForStr(kafkaSourceTableInfo.getPrincipal()))
  12. .setKeyTab(DtStringUtil.addQuoteForStr(DtFileUtils.getFileAbsolutePath(kafkaSourceTableInfo.getKeyTab())))
  13. .setUseKeyTab(kafkaSourceTableInfo.getUseKeyTab())
  14. .setUseTicketCache(kafkaSourceTableInfo.getUseTicketCache())
  15. .build()
  16. .generateJaasConfigStr();
  17. LOG.info(" kafka jaas Content: \n{}", jaasContent);
  18. props.put(SASL_JAAS_CONFIG, jaasContent);
  19. }
  20. }
  21. }

构建入口,要在OPEN中调用

  1. KafkaConsumer010#open
  2. @Override
  3. public void open(Configuration configuration) throws Exception {
  4. kerberosSecurity(kafkaSourceTableInfo, this.properties);
  5. super.open(configuration);
  6. }

相关文章

最新文章

更多