我试图利用testcontainers在一些自动化单元测试中本地测试kafka。我无法测试授权。
我的目标是测试
(1) 如果此测试容器中没有ACL KafkaProducer
应该允许对其进行写入(当前,即使没有创建ACL,只要生产者配置正确,它也可以发送到主题-我认为设置kafka env变量 allow.everyone.if.no.acl.found
如果是假的话就可以了——但事实似乎并非如此)
(2) 测试 KafkaProducer
没有使用正确的 sasl.jaas.config
(即apikey和pasword不正确)它被拒绝访问测试主题,即使为所有主体设置了acl。
下面是我的代码。我可以让它“工作”,但测试上述两种情况,我还没有弄清楚。我想我实际上可能没有创建ACL,就像在创建ACL之后添加一行一样( adminClient.describeAcls(AclBindingFilter.ANY).values().get();
我得到一个 No Authorizer is configured on the broker
错误)->看到类似的帖子,我认为这意味着没有创建acl绑定。
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.serialization.StringSerializer;
String topicName = "this-is-a-topic";
String confluentVersion = "5.5.1";
network = Network.newNetwork();
String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required %s=\"%s\" %s=\"%s\";";
String jaasConfig = String.format(jaasTemplate, "username", "apiKey", "password", "apiPassword");
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + confluentVersion))
.withNetwork(network)
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false")
.withEnv("KAFKA_SUPER_USERS", "User:OnlySuperUser")
.withEnv("KAFKA_SASL_MECHANISM", "PLAIN")
.withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "http")
.withEnv("KAFKA_SASL_JAAS_CONFIG", jaasConfig);
kafka.start();
schemaRegistryContainer = new SchemaRegistryContainer(confluentVersion).withKafka(kafka);
schemaRegistryContainer.start();
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
properties.put("input.topic.name", topicName);
properties.put("input.topic.partitions", "1");
adminClient = KafkaAdminClient.create(properties);
AclBinding ACL = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL),
new AccessControlEntry( "User:*", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
var acls = adminClient.createAcls(List.of(ACL)).values();
List<NewTopic> topics = new ArrayList<>();
topics.add(
new NewTopic(topicName,
Integer.parseInt(properties.getProperty("input.topic.partitions")),
Short.parseShort(properties.getProperty("input.topic.replication.factor")))
);
adminClient.createTopics(topics);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("input.topic.name", topicName);
props.put("security.protocol", "PLAINTEXT");
props.put("input.topic.partitions", "1");
props.put("input.topic.replication.factor", "1");
props.put("metadata.fetch.timeout.ms", "10000");
props.put("sasl.jaas.config", jaasConfig);
producer = new KafkaProducer<>(props);
String key = "testContainers";
String value = "AreAwesome";
ProducerRecord<String, String> record = new ProducerRecord<>(
(String) props.get("input.topic.name"), key, value);
try {
RecordMetadata o = (RecordMetadata) producer.send(record).get();
System.out.println(o.toString());
} catch (Exception e) {
e.printStackTrace();
}
暂无答案!
目前还没有任何答案,快来回答吧!