将delta文件写入s3(minio)-pyspark 2.4.3

gg0vcinb  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(968)

我现在正在写一篇文章
delta-lake parquet 文件到s3,我在本地用minio替换它。
我可以很好地读/写标准 parquet 文件到 S3 .
然而,当我使用三角洲湖的例子
将增量配置为s3
我好像不会写字 delta_log/ 给我的 MinIO .
所以我试着设定: fs.AbstractFileSystem.s3a.impl 以及 fs.s3a.impl .
我正在使用 pyspark[sql]==2.4.3 我现在用的是 venv . src/.env :


# pyspark packages

DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.3
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.3
PYSPARK_SUBMIT_ARGS = ${HADOOP_AWS},${HADOOP_COMMON},${DELTA}
``` `src/spark_session.py` :

configure s3 connection for read/write operation (native spark)

hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)

hadoop_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") # when using hadoop 2.8.5

hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") # alternative to above hadoop 2.8.5

hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("spark.history.fs.logDirectory", 's3a://spark-logs-test/')
`src/apps/raw_to_parquet.py`

Trying to write pyspark dataframe to MinIO (S3)

raw_df.coalesce(1).write.format("delta").save(s3_url)
``` bash :


# RUN CODE

spark-submit --packages $(PYSPARK_SUBMIT_ARGS) src/run_onlineretailer.py

错误 hadoop-common: 2.7.3 , hadoop-aws: 2.7.3 : java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.fs.s3a.S3AFileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration) 有了这个错误,我更新到 hadoop-common: 2.8.5 , hadoop-aws: 2.8.5 ,以修复 NoSuchMethodException . 因为 delta 需要:
S3AFileSystem py4j.protocol.Py4JJavaError: An error occurred while calling o89.save. : java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders(Lorg/apache/hadoop/conf/Configuration;Ljava/lang/Class;)Lorg/apache/hadoop/conf/Configuration 所以在我看来 parquet 文件可以毫无问题地写入,但是,delta会创建这些 delta_log 无法识别的文件夹(我想?)。
当前源代码。
读了几个不同的类似问题,但似乎没人试过 delta lake 文件夹。
更新
它当前使用以下设置:


# pyspark packages

DELTA_LOGSTORE = spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.7
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.7
PYSPARK_SUBMIT_ARGS = ${HADOOP_AWS},${HADOOP_COMMON},${DELTA}
PYSPARK_CONF_ARGS = ${DELTA_LOGSTORE}

# configure s3 connection for read/write operation (native spark)

hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
spark-submit --packages $(PYSPARK_SUBMIT_ARGS) --conf $(PYSPARK_CONF_ARGS) src/run_onlineretailer.py

奇怪的是它只能这样工作。
如果我试着用 sc.conf 或者 hadoop_conf 它不起作用,请参阅未注解代码:

def spark_init(self) -> SparkSession:

    sc: SparkSession = SparkSession \
        .builder \
        .appName(self.app_name) \
        .config("spark.sql.warehouse.dir", self.warehouse_location) \
        .getOrCreate()

    # set log level
    sc.sparkContext.setLogLevel("WARN")

    # Enable Arrow-based columnar data transfers
    sc.conf.set("spark.sql.execution.arrow.enabled", "true")

    # sc.conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    # configure s3 connection for read/write operation (native spark)
    hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
    hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
    hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
    #hadoop_conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    return sc

如果有人能解释,那就太好了。是因为 .getOrCreate() ? 似乎不可能设定 conf 没有这个电话?除了运行应用程序时在命令行中。

r1zhe5dt

r1zhe5dt1#

你在混合hadoop-*jars;就像spark一样,它们只有在来自同一版本时才能工作

相关问题