从S3读取Kafka存储文件位置

doinxwow  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(138)

我们得到下面的错误,所以我们开始从databricks笔记本上的S3位置(s3://my-bucket/tmp/k2/truststore.jks)获取Kafka密钥和证书

  1. DbxDlTransferError: Terminated with exception: Kafka store file location only supports external location or UC Volume path on Shared cluster. Use external location or UC Volume Path to provide it.: None

但是,当阅读S3位置时,我们得到另一个错误

  1. Caused by: java.nio.file.NoSuchFileException: s3:/my-bucket/tmp/k2/truststore.jks
  2. at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
  3. at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
  4. at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
  5. at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
  6. at java.nio.file.Files.newByteChannel(Files.java:361)
  7. at java.nio.file.Files.newByteChannel(Files.java:407)
  8. at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
  9. at java.nio.file.Files.newInputStream(Files.java:152)
  10. at kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:368)
  11. ... 86 more

我们用“//”传递S3位置,但当java读取它时,只有“/”斜杠。
我试着在网上找,但找不到任何解决方案。如果有人有什么想法,请在这里分享。Thanks in advance
这是我的代码

  1. truststore_location = "s3://my-bucket/tmp/k2/truststore.jks"
  2. cluster_ca_certificate_location = "s3://my-bucket/tmp/k2/cluster-ca-certificate.pem"
  3. kafka_server = "server_ip:9093"
  4. kafka_topic = "kafka_topic"
  5. kafka_group_id = "group_id"
  6. scram_login_module = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="" password=""'
  7. input_df = (
  8. spark
  9. .read
  10. .format("kafka")
  11. .option("kafka.bootstrap.servers", kafka_server)
  12. .option("kafka.ssl.truststore.location", truststore_location)
  13. .option("kafka.ssl.truststore.password", truststore_pass)
  14. .option("kafka.ssl.ca.location", cluster_ca_certificate_location)
  15. .option("kafka.security.protocol", "SASL_SSL")
  16. .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  17. .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username='{}' password='{}';".format(consumer_username, consumer_password))
  18. .option("subscribe", kafka_topic)
  19. .option("kafka.group.id", kafka_group_id)
  20. .option("failOnDataLoss", "false")
  21. .load()
  22. )
zbdgwd5y

zbdgwd5y1#

只有“/”斜线。
从stacktrace中,你可以看到Spark使用了Java Files.newInputStreamUnixFileSystemProvider。它只能读取本地文件,不能读取S3存储桶,因此无法理解s3://
在编写任何Spark代码之前,您需要使用S3 SDK首先将文件下载到temp目录中
例如,不要使用笔记本,而是使用shell脚本

  1. aws s3 cp s3://.../file.jks /tmp/file.jks
  2. spark-submit --files /tmp/file.jks ...

或者,您需要将选项kafka.ssl.engine.factory.class设置为从S3而不是本地磁盘读取文件的JVM implementation

相关问题