kafka自定义authenticatecallbackhandler

jdg4fx2g  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(610)

我试图实现一个authenticatecallbackhandler,准备在kafka 2.0.0中发布,但没有效果-这是一个应该工作的设置吗?
在https://cwiki.apache.org/confluence/display/kafka/kip-86%3a+configurable+sasl+callback+handlers 我读到:
使用kafka中包含的saslserver实现进行sasl/plain身份验证时使用外部身份验证服务器
定义一个实现authenticatecallbackhandler的新类,该类处理namecallback和plainauthenticatecallback,并将该类添加到代理的sasl.server.callback.handler.class属性中。将为代理创建此回调处理程序的单个示例。配置的回调处理程序负责验证客户端提供的密码,这可能使用外部身份验证服务器。
所以基本上我所做的就是创建一个类:

  1. package com.example;
  2. import javax.security.auth.callback.Callback;
  3. import javax.security.auth.callback.NameCallback;
  4. import javax.security.auth.callback.UnsupportedCallbackException;
  5. import javax.security.auth.login.AppConfigurationEntry;
  6. import java.io.IOException;
  7. import java.util.List;
  8. import java.util.Map;
  9. import org.apache.kafka.common.KafkaException;
  10. import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
  11. import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
  12. public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
  13. private List<AppConfigurationEntry> jaasConfigEntries;
  14. public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
  15. this.jaasConfigEntries = jaasConfigEntries;
  16. }
  17. public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
  18. String username = null;
  19. for (Callback callback: callbacks) {
  20. if (callback instanceof NameCallback)
  21. username = ((NameCallback) callback).getDefaultName();
  22. else if (callback instanceof PlainAuthenticateCallback) {
  23. PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
  24. boolean authenticated = authenticate(username, plainCallback.password());
  25. plainCallback.authenticated(authenticated);
  26. } else
  27. throw new UnsupportedCallbackException(callback);
  28. }
  29. }
  30. protected boolean authenticate(String username, char[] password) throws IOException {
  31. return username != null && username.equals("test") && new String(password).equals("test");
  32. }
  33. public void close() throws KafkaException {
  34. }
  35. }

构建一个jar并使其可供kafka使用,如docker-compose.yml中所示:

  1. version: '3'
  2. services:
  3. zookeeper:
  4. image: confluentinc/cp-zookeeper:5.0.0-beta1-1
  5. ports:
  6. - "2181:2181"
  7. environment:
  8. ZOOKEEPER_CLIENT_PORT: 2181
  9. ZOOKEEPER_TICK_TIME: 2000
  10. ZOOKEEPER_SASL_ENABLED: "false"
  11. kafka:
  12. image: confluentinc/cp-kafka:5.0.0-beta1-1
  13. depends_on:
  14. - zookeeper
  15. volumes:
  16. - ./security:/etc/kafka/secrets
  17. - ./jars:/etc/kafka/jars
  18. ports:
  19. - "9092:9092"
  20. environment:
  21. CLASSPATH: /etc/kafka/jars/*
  22. ZOOKEEPER_SASL_ENABLED: "false"
  23. KAFKA_BROKER_ID: 1
  24. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  25. KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
  26. KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
  27. KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
  28. KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
  29. KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
  30. KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
  31. KAFKA_SUPER_USERS: User:admin
  32. KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
  33. KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: com.example.CustomAuthenticateCallbackHandler

经纪人\ jaas.conf:

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="admin-secret"
  5. user_admin="admin-secret"
  6. ;
  7. };

cli-client.properties属性:

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN
  3. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  4. username="test" \
  5. password="test";

然后我用以下方法进行测试:

  1. > kafka-console-producer --broker-list kafka:9092 --topic test-topic --producer.config /etc/kafka/secrets/cli-client.properties
  2. This is a message
  3. This is another message

但是,我得到一个生产者无法验证的错误:
[2018-05-17 19:49:06955]错误[producer clientid=console producer]连接到节点-1身份验证失败,原因:身份验证失败:用户名或密码无效(org.apache.kafka.clients.networkclient)

pdsfdshx

pdsfdshx1#

我从Kafka的邮件列表中得到了以下答案,将在下周尝试:
这个特性将是即将发布的Kafka2.0.0版本的一部分。
公关部来了:https://github.com/apache/kafka/pull/4890
此处配置:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/saslconfigs.java#l57

相关问题