从flink sql连接到Confluent Kafka云

aamkag61  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(135)

当我尝试连接到Confluent云时,出现以下错误。

Caused by: org.apache.kafka.common.KafkaException: java.security.NoSuchAlgorithmException: TLSv1.3 SSLContext not available
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:268)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:173)
    at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:180)
    ... 18 more
Caused by: java.security.NoSuchAlgorithmException: TLSv1.3 SSLContext not available
    at java.base/sun.security.jca.GetInstance.getInstance(GetInstance.java:159)
    at java.base/javax.net.ssl.SSLContext.getInstance(SSLContext.java:168)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:243)

字符串
我使用的flink SQL:

CREATE TABLE IF NOT EXISTS some_source_table
(
    headers     VARCHAR NOT NULL,
    id          VARCHAR NOT NULL,
    `timestamp` TIMESTAMP_LTZ(3) NULL,
    type        VARCHAR NOT NULL,
    contentJson VARCHAR NOT NULL
) WITH (
    'connector' = 'kafka',
    'topic-pattern' = 'kafka_topic__.+?',
    'properties.bootstrap.servers' = 'some.aws.confluent.cloud:9092',
    'properties.group.id' = 'some-id-1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.topic-partition-discovery.interval'= '60000',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);


当我检查运行任务管理器的Kubernetes pod中的Java版本时,我看到了11.0.18
UPDATE我的gradle文件如下所示:implementation“org. apache.flink:flink-streaming-java:${flinkVersion}”implementation“org. apache.flink:flink-table-api-java-bridge:${flinkVersion}”implementation“org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}”implementation“org. apache. flink:flink-json:${flinkVersion}”implementation“org.apache.flink:flink-clients:${flinkVersion}”

//this seems to be needed : removing this is causing error in guice JsonProperty.Naming
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"

implementation "org.apache.flink:flink-statebackend-rocksdb:${flinkVersion}"

//needed for a local flink ui to show when running environment = local
implementation "org.apache.flink:flink-runtime-web:${flinkVersion}"

implementation "org.apache.logging.log4j:log4j-core:${log4jVersion}"
implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"

implementation 'io.jsonwebtoken:jjwt:0.2'
implementation 'com.mashape.unirest:unirest-java:1.4.9'
implementation 'org.rocksdb:rocksdbjni:7.9.2'

// --------------------------------------------------------------
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --------------------------------------------------------------
flinkShadowJar files('libs/flink-connector-kafka-1.16.0.jar')
flinkShadowJar "org.apache.flink:flink-connector-base:${flinkVersion}"

flinkShadowJar "org.apache.commons:commons-text:1.10.0"
flinkShadowJar "org.projectlombok:lombok:1.18.26"
flinkShadowJar "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
flinkShadowJar "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
flinkShadowJar "com.sailpoint:atlas:${ATLAS_VERSION}"
flinkShadowJar "com.sailpoint:atlas-event:${ATLAS_VERSION}"
flinkShadowJar "com.google.inject:guice:5.1.0"
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
flinkShadowJar 'io.jsonwebtoken:jjwt:0.2'
flinkShadowJar 'com.mashape.unirest:unirest-java:1.4.9'


我能错过什么呢?

sczxawaw

sczxawaw1#

我假设您使用的是Flink's SQL connector,它在那里遮蔽了Kafka Clients对象。由于遮蔽,这意味着JAAS Config需要指向遮蔽的PlainLoginModule,例如:

'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="key" password="value";'

字符串

相关问题