我试图实现一个authenticatecallbackhandler,准备在kafka 2.0.0中发布,但没有效果-这是一个应该工作的设置吗?
在https://cwiki.apache.org/confluence/display/kafka/kip-86%3a+configurable+sasl+callback+handlers 我读到:
使用kafka中包含的saslserver实现进行sasl/plain身份验证时使用外部身份验证服务器
定义一个实现authenticatecallbackhandler的新类,该类处理namecallback和plainauthenticatecallback,并将该类添加到代理的sasl.server.callback.handler.class属性中。将为代理创建此回调处理程序的单个示例。配置的回调处理程序负责验证客户端提供的密码,这可能使用外部身份验证服务器。
所以基本上我所做的就是创建一个类:
package com.example;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
private List<AppConfigurationEntry> jaasConfigEntries;
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
this.jaasConfigEntries = jaasConfigEntries;
}
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback: callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof PlainAuthenticateCallback) {
PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
boolean authenticated = authenticate(username, plainCallback.password());
plainCallback.authenticated(authenticated);
} else
throw new UnsupportedCallbackException(callback);
}
}
protected boolean authenticate(String username, char[] password) throws IOException {
return username != null && username.equals("test") && new String(password).equals("test");
}
public void close() throws KafkaException {
}
}
构建一个jar并使其可供kafka使用,如docker-compose.yml中所示:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0-beta1-1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SASL_ENABLED: "false"
kafka:
image: confluentinc/cp-kafka:5.0.0-beta1-1
depends_on:
- zookeeper
volumes:
- ./security:/etc/kafka/secrets
- ./jars:/etc/kafka/jars
ports:
- "9092:9092"
environment:
CLASSPATH: /etc/kafka/jars/*
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SUPER_USERS: User:admin
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: com.example.CustomAuthenticateCallbackHandler
经纪人\ jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
;
};
cli-client.properties属性:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="test" \
password="test";
然后我用以下方法进行测试:
> kafka-console-producer --broker-list kafka:9092 --topic test-topic --producer.config /etc/kafka/secrets/cli-client.properties
This is a message
This is another message
但是,我得到一个生产者无法验证的错误:
[2018-05-17 19:49:06955]错误[producer clientid=console producer]连接到节点-1身份验证失败,原因:身份验证失败:用户名或密码无效(org.apache.kafka.clients.networkclient)
1条答案
按热度按时间pdsfdshx1#
我从Kafka的邮件列表中得到了以下答案,将在下周尝试:
这个特性将是即将发布的Kafka2.0.0版本的一部分。
公关部来了:https://github.com/apache/kafka/pull/4890
此处配置:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/saslconfigs.java#l57