我在一家公司的VDI工作,出于安全原因,他们使用自己的工件。目前我正在编写单元测试,以执行从增量表中删除条目的函数的测试。当我开始时,我收到了一个未解析依赖项的错误,因为我的Spark会话被配置为从Maven加载jar文件。现在我的代码看起来像这样:
class TestTransformation(unittest.TestCase):
@classmethod
def test_ksu_deletion(self):
self.spark = SparkSession.builder\
.appName('SPARK_DELETION')\
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
.config("spark.jars", "/opt/spark/jars/delta-core_2.12-0.7.0.jar, /opt/spark/jars/hadoop-aws-3.2.0.jar")\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.getOrCreate()
os.environ["KSU_DELETION_OBJECT"]="UNITTEST/"
deltatable = DeltaTable.forPath(self.spark, "/projects/some/path/snappy.parquet")
deltatable.delete(col("DATE") < get_current()
但是,我收到错误消息:
E py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
E : java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
你知道这是什么原因造成的吗?我假设这与我配置spark.sql.extions和/或spark.sql.catalog的方式有关,但老实说,我是Spark的新手。我非常感谢任何提示。
提前多谢了!
编辑:我们使用的是Spark 3.0.2(Scala 2.12.10)。根据https://docs.delta.io/latest/releases.html,这应该是兼容的。除了SparkSession之外,我将后续代码精简为
df.spark.read.parquet(Path/to/file.snappy.parquet)
现在我收到错误消息
java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.catalyst.plans.logical.DeltaDelete has interface org.apache.spark.sql.catalyst.plans.logical.UnaryNode as super class
正如我所说,我是相当新的(Py)Spark,所以请不要犹豫,提到你认为完全显而易见的事情。
编辑2:在运行代码之前,我检查了Shell中导出的Python路径,我可以看到以下内容:x1c 0d1x这会导致任何问题吗?我不明白为什么我在pipenv中运行代码时没有得到这个错误(使用spark-submit)
1条答案
按热度按时间hi3rlvi21#
看起来您使用的是Delta lake库的不兼容版本。0.7.0是Spark 3.0的版本,但您使用的是另一个版本--或者更低,或者更高。请查阅Delta releases page以找到Delta版本和所需Spark版本之间的Map。
如果你使用的是Spark 3.1或3.2,可以考虑使用delta-spark Python包,它会安装所有必要的依赖项,所以你只需要导入
DeltaTable
类。更新:是的,发生这种情况是因为版本冲突--你需要删除
delta-spark
和pyspark
Python包,并显式安装pyspark==3.0.2
。另外,看看pytest-spark包,它可以简化所有测试的配置规范。你可以找到它的例子+ Delta here。