kafka流异常:groupauthorizationexception

7rtdyuoh  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(1021)

我正在开发一个kafka流应用程序,它将读取来自输入kafka主题的消息,过滤不需要的数据,并推送输出kafka主题。
Kafka流配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}

kstream筛选器逻辑:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /**Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + "*****Message From Input Topic: " + key + ": " + value));
    KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));

    filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
    /**After filtering printing the same message */
    filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
    return stream;
}

当我从基于spring的kafka流应用程序开始时,我遇到了以下异常。

2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0

我们的kafka infra团队对“group.id”给予了必要的许可,使用相同的“group id”,我可以使用其他kafka消费者应用程序使用消息,并且我在“application.id”中按照我的意愿使用了name。我们没有在Kafka访问控制列表中添加/更新“application.id”。
我真的不确定我们是否需要为“application.id”授予任何权限,或者我在kafka流配置中遗漏了一些内容。请给我建议。
请注意:我尝试过在kafka流配置中使用with“group.id”和without“group.id”,每次都会遇到相同的异常。
谢谢!bharathiraja shanmugam先生

pkwftd7m

pkwftd7m1#

我不在办公桌上,但我认为streams将group.id设置为application.id。

oalqel3c

oalqel3c2#

我们还需要为application.id设置访问权限。有关更多信息,请参阅->https://docs.confluent.io/current/streams/developer-guide/security.html
安全kafka群集所需的acl设置
kafka集群可以使用acl来控制对资源的访问(比如创建主题的能力),对于这样的集群,每个客户端(包括kafka流)都需要作为特定的用户进行身份验证,以便获得适当的访问权限。特别是,当针对安全kafka集群运行streams应用程序时,运行该应用程序的主体必须设置acl,以便该应用程序具有创建内部主题的权限。
由于所有内部主题以及嵌入的使用者组名称都以应用程序id作为前缀,建议使用前缀资源模式上的ACL来配置控制列表,以允许客户端管理以该前缀开头的所有主题和使用者组,如--resource pattern type prefixed--topic--operation all(有关详细信息,请参阅kip-277和kip-290)。
例如,给定streams应用程序的以下设置:
•配置 application.id 价值是 team1-streams-app1 . • 以kafka群集身份验证 team1 用户。•应用程序的编码拓扑读取输入主题 input-topic1 以及 input-topic2 . • 应用程序的拓扑结构写入到输出主题 output-topic1 以及 output-topic2 .
然后,以下命令将在kafka集群中创建必要的acl,以允许您的应用程序运行:


# Allow Streams to read the input topics:

bin/kafka-acls ... --add --allow-principal User:team1 --operation Read --topic input-topic1 --topic input-topic2

# Allow Streams to write to the output topics:

bin/kafka-acls ... --add --allow-principal User:team1 --operation Write --topic output-topic1 --topic output-topic2

# Allow Streams to manage its own internal topics and consumer groups:

bin/kafka-acls ... --add --allow-principal User:team1 --operation All --resource-pattern-type prefixed --topic team1-streams-app1 --group team1-streams-app1

相关问题