我在openshift中设置了一个ksql服务器,并连接到本地cloudera kafka集群(cdh6)(kerberized和ssl)。当我执行“列出主题”或“打印”命令时,一切正常。但是,只要我想创建一个流,就会出现以下错误:
Could not write the statement 'create stream dev_abc (date varchar, timestamp varchar, latitude varchar, longitude varchar) WITH (KAFKA_TOPIC='topic123', VALUE_FORMAT='JSON');' into the command topic: Transactional Id authorization failed.
Caused by: Transactional Id authorization failed.
查看日志文件时,我还看到以下错误:
[2020-11-18 11:53:58,090] INFO Processed unsuccessfully: KsqlRequest{ksql='CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<target VARCHAR, errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>, serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, threadName VARCHAR, cause ARRAY<VARCHAR>>>) WITH(KAFKA_TOPIC='service_uykh7k6ksql_processing_log', VALUE_FORMAT='JSON');', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]}, reason: Could not write the statement 'CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<target VARCHAR, errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>, serializationError STRUCT<target VARCHAR, errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, kafkaStreamsThreadError STRUCT<errorMessage VARCHAR, threadName VARCHAR, cause ARRAY<VARCHAR>>>) WITH(KAFKA_TOPIC='service_abc_processing_log', VALUE_FORMAT='JSON');' into the command topic: Transactional Id authorization failed. (io.confluent.ksql.rest.server.resources.KsqlResource:301)
我现在正在使用以下配置:
图片:confluentinc/ksqldb-server:0.13.0 (但也试过老的)
ksql-server.properties属性:
listeners=http://0.0.0.0:8088
# to avoid Attempted to write a non-default includeClusterAuthorizedOperations at version 7
ksql.access.validator.enable=off
kafka.confluent.support.metrics.enable=false
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
ssl.truststore.location=/.../.../truststore.jks
ssl.truststore.password=XXXXX
ssl.truststore.type=JKS
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="blablub.keytab" serviceName="kafka" principal="principalname";
serviceName="kafka"
principal="principalname";
ksql.service.id=myservicename
# authentication for producers, needed for ksql commands like "Create Stream"
producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=/.../truststore.jks
producer.ssl.truststore.password=XXXXX
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="....keytab" serviceName="kafka" principal="principalname";
# authentication for consumers, needed for ksql commands like "Create Stream"
consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/..../truststore.jks
consumer.ssl.truststore.password=XXXXX
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/.....keytab" serviceName="kafka" principal="principalname";
# ------ Logging config -------
# Automatically create the processing log topic if it does not already exist:
ksql.logging.processing.topic.auto.create=false
ksql.logging.processing.topic.name=abc_processing_log
# Automatically create a stream within KSQL for the processing log:
ksql.logging.processing.stream.auto.create=true
# ------ External service config -------
# The set of Kafka brokers to bootstrap Kafka cluster information from:
bootstrap.servers=.....:9093,.....:9093,......:9093
我发现了很多关于使用kafka acl来避免此类错误的方法,但是cdh不支持这种方法。你对如何继续分析有什么想法吗?
暂无答案!
目前还没有任何答案,快来回答吧!