我们得到下面的错误,所以我们开始从databricks笔记本上的S3位置(s3://my-bucket/tmp/k2/truststore.jks)获取Kafka密钥和证书
DbxDlTransferError: Terminated with exception: Kafka store file location only supports external location or UC Volume path on Shared cluster. Use external location or UC Volume Path to provide it.: None
但是,当阅读S3位置时,我们得到另一个错误
Caused by: java.nio.file.NoSuchFileException: s3:/my-bucket/tmp/k2/truststore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:368)
... 86 more
我们用“//”传递S3位置,但当java读取它时,只有“/”斜杠。
我试着在网上找,但找不到任何解决方案。如果有人有什么想法,请在这里分享。Thanks in advance
这是我的代码
truststore_location = "s3://my-bucket/tmp/k2/truststore.jks"
cluster_ca_certificate_location = "s3://my-bucket/tmp/k2/cluster-ca-certificate.pem"
kafka_server = "server_ip:9093"
kafka_topic = "kafka_topic"
kafka_group_id = "group_id"
scram_login_module = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="" password=""'
input_df = (
spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("kafka.ssl.truststore.location", truststore_location)
.option("kafka.ssl.truststore.password", truststore_pass)
.option("kafka.ssl.ca.location", cluster_ca_certificate_location)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username='{}' password='{}';".format(consumer_username, consumer_password))
.option("subscribe", kafka_topic)
.option("kafka.group.id", kafka_group_id)
.option("failOnDataLoss", "false")
.load()
)
1条答案
按热度按时间zbdgwd5y1#
只有“/”斜线。
从stacktrace中,你可以看到Spark使用了Java
Files.newInputStream
和UnixFileSystemProvider
。它只能读取本地文件,不能读取S3存储桶,因此无法理解s3://
。在编写任何Spark代码之前,您需要使用S3 SDK首先将文件下载到temp目录中
例如,不要使用笔记本,而是使用shell脚本
或者,您需要将选项
kafka.ssl.engine.factory.class
设置为从S3而不是本地磁盘读取文件的JVM implementation。