docker 用minio的spark读取文件

2wnc66cl  于 12个月前  发布在  Docker
关注(0)|答案(1)|浏览(105)

我正在尝试使用平台,创建某种数据湖/仓库。所以我在localhost:9000上有minio示例,我可以使用ui和上传文件吗?太好了。
我创建了一个名为spark network的网络,并开始:

docker volume create s3_drive
docker run --network spark_network --network-alias iceberg-lake.minio -p 9000:9000 -p 9001:9001 -v s3_drive:/data -e MINIO_ROOT_USER=username -e MINIO_ROOT_PASSWORD=password -d --name minio quay.io/minio/minio server /data --console-address ":9001"

字符串
我已经下载了Docker镜像

FROM quay.io/jupyter/all-spark-notebook

# Set the working directory to /code
WORKDIR /code

# Copy PostgreSQL driver
COPY postgresql-42.7.1.jar /opt/jars/postgresql-42.7.1.jar

# Expose Spark UI and JupyterLab ports
EXPOSE 4040 8888

# Start JupyterLab on container start
CMD ["start-notebook.sh", "--NotebookApp.token=''"]


其次是

docker volume create jupyter_code
docker run -p 8888:8888 -p 4040:4040 -e AWS_REGION=us-east-1 -e MINIO_REGION=us-east-1 -e AWS_S3_ENDPOINT=http://minio:9000 -e AWS_ACCESS_KEY_ID=key_id -e AWS_SECRET_ACCESS_KEY=secret_key --network spark_network -v jupyter_code:/code -d --name spark spark-jupyterlab


所以一切都好了,我在网络上有另一个postgres。
当我尝试使用spark(jupyter lab)从minio服务器读取时,我正在创建会话:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("iceberg_jdbc") \
    .config("spark.jars", "/opt/jars/postgresql-42.7.1.jar") \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,org.apache.iceberg:iceberg-aws-bundle:1.4.2') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.catalog.cves', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.cves.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.catalog.cves.warehouse', 's3a://iceberg-lake/lake/') \
    .config('spark.sql.catalog.cves.s3.endpoint', 'http://minio:9000') \
    .config('spark.sql.catalog.cves.s3.path-style-access', 'true') \
    .config('spark.sql.catalog.cves.catalog-impl', 'org.apache.iceberg.jdbc.JdbcCatalog') \
    .config('spark.sql.catalog.cves.uri', 'jdbc:postgresql://postgres-test:5432/metastore') \
    .config('spark.sql.catalog.cves.jdbc.verifyServerCertificate', 'false') \
    .config('spark.sql.catalog.cves.jdbc.useSSL','false') \
    .config('spark.sql.catalog.cves.jdbc.user', '<pg user>') \
    .config('spark.sql.catalog.cves.jdbc.password','<pg password>') \
    .config("spark.hadoop.fs.s3a.access.key", "<access-key>") \
    .config("spark.hadoop.fs.s3a.secret.key", "<secret-key>") \
    .getOrCreate()


在所有这些之后,我试图从minio上的bucket中读取一个名为“iceberg-lake”的json文件。
我尝试在不更改或添加任何策略或用户的情况下(当然是使用访问密钥),也使用用户和策略等。当我尝试这样读取时,最终结果总是相同的。

df = spark.read.json("s3a://iceberg-lake/lake/1999/0xxx/CVE-1999-0001.json")


我得到了这个:

Py4JJavaError: An error occurred while calling o60.json.
: java.nio.file.AccessDeniedException: s3a://iceberg-lake/lake/1999/0xxx/CVE-1999-0001.json: getFileStatus on s3a://iceberg-lake/lake/1999/0xxx/CVE-1999-0001.json: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: AGJWYZYKWD26CZXH; S3 Extended Request ID: 0haOOhMeMtsgJN5OMwmehLBWVFkDqZmy8WHvA+Tiym4IBNR0x88kIPEH3ddonlVPS6/FXcOyfrI=; Proxy: null), S3 Extended Request ID: 0haOOhMeMtsgJN5OMwmehLBWVFkDqZmy8WHvA+Tiym4IBNR0x88kIPEH3ddonlVPS6/FXcOyfrI=:403 Forbidden
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:756)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:754)
    at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:380)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)


我尝试了不同的策略和不同的密钥,有用户和没有用户。我删除了所有的图像,从头开始重新构建。尝试使用新的容器和所有。仍然,我不能读取一个JSON文件(最后我当然想读取整个文件夹)
有人能帮忙吗?

mf98qq94

mf98qq941#

错误返回扩展请求ID的事实意味着响应来自aws,而不是minio
为什么不将“spark.hadoop.fs.s3a.endpoint”设置为端点,这样s3a连接器就可以获取值了?

相关问题