无法使用ssl配置创建多个kafka绑定器

rkttyhzu  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(398)

我正在尝试通过sasl\u ssl协议和jaas config连接到kafka群集,如下所示:

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. binding-1:
  6. binder: kafka-1-with-ssl
  7. destination: <destination-1>
  8. content-type: text/plain
  9. group: <group-id-1>
  10. consumer:
  11. header-mode: headers
  12. binding-2:
  13. binder: kafka-2-with-ssl
  14. destination: <destination-2>
  15. content-type: text/plain
  16. group: <group-id-2>
  17. consumer:
  18. header-mode: headers
  19. binders:
  20. kafka-1-with-ssl:
  21. type: kafka
  22. defaultCandidate: false
  23. environment:
  24. spring:
  25. cloud:
  26. stream:
  27. kafka:
  28. binder:
  29. brokers: <broker-hostnames-1>
  30. configuration:
  31. ssl:
  32. truststore:
  33. location: <location-1>
  34. password: <ts-password-1>
  35. type: JKS
  36. jaas:
  37. loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
  38. options:
  39. username: <username-1>
  40. password: <password-1>
  41. kafka-2-with-ssl:
  42. type: kafka
  43. defaultCandidate: false
  44. environment:
  45. spring:
  46. cloud:
  47. stream:
  48. kafka:
  49. binder:
  50. brokers: <broker-hostnames-2>
  51. configuration:
  52. ssl:
  53. truststore:
  54. location: <location-2>
  55. password: <ts-password-2>
  56. type: JKS
  57. jaas:
  58. loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
  59. options:
  60. username: <username-2>
  61. password: <password-2>
  62. kafka:
  63. binder:
  64. configuration:
  65. security:
  66. protocol: SASL_SSL
  67. sasl:
  68. mechanism: SCRAM-SHA-256

上面的配置与springcloudstream的官方gitrepo上提供的示例配置是内联的。
在图书馆的gitrepo上提出的类似问题表示,它在最新版本中得到了修复,但看起来并非如此。获取以下错误:
springbootversion:2.2.8和SpringCloudStreamDependencies版本-horsham.sr6。

  1. Failed to create consumer binding; retrying in 30 seconds | org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
  2. at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
  3. at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
  4. at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
  5. at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
  6. at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
  7. at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
  8. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  9. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  10. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  11. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  12. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  13. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
  14. at java.lang.Thread.run(Thread.java:748)
  15. Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
  16. at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
  17. at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:65)
  18. at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:246)
  19. at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:216)
  20. at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:183)
  21. at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:79)
  22. at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:402)
  23. ... 12 common frames omitted
  24. Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
  25. at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
  26. at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
  27. at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
  28. at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
  29. at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
  30. ... 18 common frames omitted
  31. Caused by: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
  32. at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:804)
  33. at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
  34. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  35. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  36. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  37. at java.lang.reflect.Method.invoke(Method.java:498)
  38. at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
  39. at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
  40. at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
  41. at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
  42. at java.security.AccessController.doPrivileged(Native Method)
  43. at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
  44. at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
  45. at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
  46. at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
  47. at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
  48. at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
  49. ... 22 common frames omitted
  50. Caused by: sun.security.krb5.RealmException: KrbException: Cannot locate default realm
  51. at sun.security.krb5.Realm.getDefault(Realm.java:68)
  52. at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:462)
  53. at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:471)
  54. at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:706)
  55. ... 38 common frames omitted
  56. Caused by: sun.security.krb5.KrbException: Cannot locate default realm
  57. at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
  58. at sun.security.krb5.Realm.getDefault(Realm.java:64)
  59. ... 41 common frames omitted

这让我觉得库没有正确地获取配置道具,因为 jaas.loginModule 指定为 ScramLoginModule 但它在使用 Krb5LoginModule 进行身份验证。
但是,令人吃惊的是,当按照以下方式进行配置时(区别在于最后一部分在binder的环境之外使用ssl凭据),它连接到全局ssl props(在binder的env之外)中指定的binder,并且默默地忽略另一个binder,而不显示任何错误日志。
说明绑定器的密码凭据 kafka-2-with-ssl 如果在全局ssl props中指定了,则将创建该绑定器,并且订阅该绑定器的绑定将开始使用事件。但这仅在需要创建单个绑定器时才有用。

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. binding-1:
  6. binder: kafka-1-with-ssl
  7. destination: <destination-1>
  8. content-type: text/plain
  9. group: <group-id-1>
  10. consumer:
  11. header-mode: headers
  12. binding-2:
  13. binder: kafka-2-with-ssl
  14. destination: <destination-2>
  15. content-type: text/plain
  16. group: <group-id-2>
  17. consumer:
  18. header-mode: headers
  19. binders:
  20. kafka-1-with-ssl:
  21. type: kafka
  22. defaultCandidate: false
  23. environment:
  24. spring:
  25. cloud:
  26. stream:
  27. kafka:
  28. binder:
  29. brokers: <broker-hostnames-1>
  30. configuration:
  31. ssl:
  32. truststore:
  33. location: <location-1>
  34. password: <ts-password-1>
  35. type: JKS
  36. jaas:
  37. loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
  38. options:
  39. username: <username-1>
  40. password: <password-1>
  41. kafka-2-with-ssl:
  42. type: kafka
  43. defaultCandidate: false
  44. environment:
  45. spring:
  46. cloud:
  47. stream:
  48. kafka:
  49. binder:
  50. brokers: <broker-hostnames-2>
  51. configuration:
  52. ssl:
  53. truststore:
  54. location: <location-2>
  55. password: <ts-password-2>
  56. type: JKS
  57. jaas:
  58. loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
  59. options:
  60. username: <username-2>
  61. password: <password-2>
  62. kafka:
  63. binder:
  64. configuration:
  65. security:
  66. protocol: SASL_SSL
  67. sasl:
  68. mechanism: SCRAM-SHA-256
  69. ssl:
  70. truststore:
  71. location: <location-2>
  72. password: <ts-password-2>
  73. type: JKS
  74. jaas:
  75. loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
  76. options:
  77. username: <username-2>
  78. password: <password-2>

向您保证ssl凭据没有问题。用ssl-kafka绑定器中的任何一个进行了认真的测试,成功地单独创建了。其目的是通过sasl\u ssl协议连接到多个kafka绑定器。提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题