我正在开发一个kafka流应用程序,它将读取来自输入kafka主题的消息,过滤不需要的数据,并推送输出kafka主题。
Kafka流配置:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");
return new KafkaStreamsConfiguration(streamsConfiguration);
}
kstream筛选器逻辑:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/**Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + "*****Message From Input Topic: " + key + ": " + value));
KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));
filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
/**After filtering printing the same message */
filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
return stream;
}
当我从基于spring的kafka流应用程序开始时,我遇到了以下异常。
2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0
我们的kafka infra团队对“group.id”给予了必要的许可,使用相同的“group id”,我可以使用其他kafka消费者应用程序使用消息,并且我在“application.id”中按照我的意愿使用了name。我们没有在Kafka访问控制列表中添加/更新“application.id”。
我真的不确定我们是否需要为“application.id”授予任何权限,或者我在kafka流配置中遗漏了一些内容。请给我建议。
请注意:我尝试过在kafka流配置中使用with“group.id”和without“group.id”,每次都会遇到相同的异常。
谢谢!bharathiraja shanmugam先生
2条答案
按热度按时间pkwftd7m1#
我不在办公桌上,但我认为streams将group.id设置为application.id。
oalqel3c2#
我们还需要为application.id设置访问权限。有关更多信息,请参阅->https://docs.confluent.io/current/streams/developer-guide/security.html
安全kafka群集所需的acl设置
kafka集群可以使用acl来控制对资源的访问(比如创建主题的能力),对于这样的集群,每个客户端(包括kafka流)都需要作为特定的用户进行身份验证,以便获得适当的访问权限。特别是,当针对安全kafka集群运行streams应用程序时,运行该应用程序的主体必须设置acl,以便该应用程序具有创建内部主题的权限。
由于所有内部主题以及嵌入的使用者组名称都以应用程序id作为前缀,建议使用前缀资源模式上的ACL来配置控制列表,以允许客户端管理以该前缀开头的所有主题和使用者组,如--resource pattern type prefixed--topic--operation all(有关详细信息,请参阅kip-277和kip-290)。
例如,给定streams应用程序的以下设置:
•配置
application.id
价值是team1-streams-app1
. • 以kafka群集身份验证team1
用户。•应用程序的编码拓扑读取输入主题input-topic1
以及input-topic2
. • 应用程序的拓扑结构写入到输出主题output-topic1
以及output-topic2
.然后,以下命令将在kafka集群中创建必要的acl,以允许您的应用程序运行: