我有Spark 3.1.2运行在K8S的客户端模式下(我有8个worker)。我设置了一个NFS存储器来更新存储在它上面的delta文件。我的Spark向我抛出了以下错误:
java.io.IOException: Cannot create file:/spark-nfs/v_data/delta/table_1/_delta_log
at org.apache.spark.sql.delta.DeltaLog.ensureLogDirectoryExist(DeltaLog.scala:290)
抛出错误的代码是:
df.write.partitionBy("Cod").format('delta').save(path="/spark-nfs/v_data/delta/table_1/", mode='overwrite')
我的spark配置是:
self.conf = {
"spark.network.timeout": 36000000,
"spark.executor.heartbeatInterval": 36000000,
"spark.storage.blockManagerSlaveTimeoutMs": 36000000,
"spark.driver.maxResultSize": "30g",
"spark.sql.session.timeZone": "UTC",
"spark.driver.extraJavaOptions": "-Duser.timezone=GMT",
"spark.executor.extraJavaOptions": "-Duser.timezone=GMT",
"spark.driver.host": pod_ip,
"spark.driver.memory": executor_memory,
"spark.memory.offHeap.enabled": True,
"spark.memory.offHeap.size": executor_memory,
"spark.sql.legacy.parquet.int96RebaseModeInRead" : "CORRECTED",
"spark.sql.legacy.parquet.int96RebaseModeInWrite" : "CORRECTED",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead" : "CORRECTED",
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite" : "CORRECTED",
"fs.permissions.umask-mode": "777"
}
我使用的是io.delta:delta-core_2.12:1.0.0
。
那么,既然我授予了完全权限,为什么我不能创建增量日志文件呢?
- 注意:只有
_delta_log
文件未创建, parquet 文件通常在目录中创建。**
- 注意:只有
1条答案
按热度按时间vybvopom1#
基本上,当我在客户端模式下使用spark时,当我启动一个作业时(我是通过Airflow启动的),调用spark的节点(在我的例子中是Airflow worker)是Spark的主节点,所以我必须将所有节点(spark、airflow)指向NFS写入(我只将spark worker指向NFS)。