无法使用sasl.jaas.config让testcontainers kafka测试ACL是否正常工作

x6yk4ghg  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(450)

我试图利用testcontainers在一些自动化单元测试中本地测试kafka。我无法测试授权。
我的目标是测试
(1) 如果此测试容器中没有ACL KafkaProducer 应该允许对其进行写入(当前,即使没有创建ACL,只要生产者配置正确,它也可以发送到主题-我认为设置kafka env变量 allow.everyone.if.no.acl.found 如果是假的话就可以了——但事实似乎并非如此)
(2) 测试 KafkaProducer 没有使用正确的 sasl.jaas.config (即apikey和pasword不正确)它被拒绝访问测试主题,即使为所有主体设置了acl。
下面是我的代码。我可以让它“工作”,但测试上述两种情况,我还没有弄清楚。我想我实际上可能没有创建ACL,就像在创建ACL之后添加一行一样( adminClient.describeAcls(AclBindingFilter.ANY).values().get(); 我得到一个 No Authorizer is configured on the broker 错误)->看到类似的帖子,我认为这意味着没有创建acl绑定。

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.serialization.StringSerializer;

        String topicName = "this-is-a-topic";
        String confluentVersion = "5.5.1";
        network = Network.newNetwork();
        String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required %s=\"%s\" %s=\"%s\";";
        String jaasConfig = String.format(jaasTemplate, "username", "apiKey", "password", "apiPassword");
        kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + confluentVersion))
                .withNetwork(network)
                .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
                .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false")
                .withEnv("KAFKA_SUPER_USERS", "User:OnlySuperUser")
                .withEnv("KAFKA_SASL_MECHANISM", "PLAIN")
                .withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "http")
                .withEnv("KAFKA_SASL_JAAS_CONFIG", jaasConfig);

        kafka.start();
        schemaRegistryContainer = new SchemaRegistryContainer(confluentVersion).withKafka(kafka);
        schemaRegistryContainer.start();

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        properties.put("input.topic.name", topicName);
        properties.put("input.topic.partitions", "1");
        adminClient = KafkaAdminClient.create(properties);
        AclBinding ACL = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL),
                new AccessControlEntry( "User:*", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        var acls = adminClient.createAcls(List.of(ACL)).values();

        List<NewTopic> topics = new ArrayList<>();
        topics.add(
                new NewTopic(topicName,
                        Integer.parseInt(properties.getProperty("input.topic.partitions")),
                        Short.parseShort(properties.getProperty("input.topic.replication.factor")))
        );
        adminClient.createTopics(topics);

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put("input.topic.name", topicName);
        props.put("security.protocol", "PLAINTEXT");
        props.put("input.topic.partitions", "1");
        props.put("input.topic.replication.factor", "1");
        props.put("metadata.fetch.timeout.ms", "10000");
        props.put("sasl.jaas.config", jaasConfig);

        producer = new KafkaProducer<>(props);

        String key = "testContainers";
        String value = "AreAwesome";
        ProducerRecord<String, String> record = new ProducerRecord<>(
                        (String) props.get("input.topic.name"), key, value);
        try {
             RecordMetadata o = (RecordMetadata) producer.send(record).get();
             System.out.println(o.toString());
        } catch (Exception e) {
             e.printStackTrace();
        }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题