PySpark错误:java.lang.NoSuchMethodError:'scala.collection.immutable.Seq org.apache.spark.sql.types.StructType.toAttributes()'

5fjcxozz  于 2023-03-22  发布在  Spark
关注(0)|答案(1)|浏览(189)

我正在尝试从PySpark连接到MongoDB数据库。

$ pyspark --packages org.mongodb.spark:mongo-spark-connector_2.13:10.1.1

我的安装版本:Python 3.9 Scala:2.12.15Spark:3.3.2
在pyspark中,我可以很好地连接到数据库。我可以很好地加载一个 Dataframe ,也可以很好地显示从MongoDB获取的 Dataframe 的元数据。

>>> df = spark.read.format("mongodb").option("connection.uri", "mongodb://127.0.0.1/test.testData").load()  
>>> df.columns
['_id', 'x']

一切都很好。但是当我试图获取和查看数据时,我就得到了错误。

>>> df.first() or
    >>> df.show()

错误:

>>> df.first()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1938, in first
    return self.head()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1924, in head
    rs = self.head(1)
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1926, in head
    return self.take(n)
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 868, in take
    return self.limit(num).collect()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o44.collectToPython.
: java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.types.StructType.toAttributes'
        at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:4
        at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.ja:80)
        at com.mongodb.spark.sql.connector.read.MongoScanBuilder.<clinit>(MongoScanBuilder.java:75)
        at com.mongodb.spark.sql.connector.MongoTable.newScanBuilder(MongoTable.java:125)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applrElse(V2ScanRelationPushDown.scala:56)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applrElse(V2ScanRelationPushDown.scala:54)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:208
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1258)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.sca:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.sca:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:528)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.createScanBuilder(V2ScanRelationshDown.scala:54)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$1(V2ScanRelationPhDown.scala:42)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$7(V2ScanRelationPhDown.scala:50)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.sca:49)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.sca:37)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainStng(QueryExecution.scala:249)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1589)

我安装了以下软件包:

  • spark-catalyst_2.12-3.3.2.jar
  • /scala-library-2.10.0-M3.jar

有人可以帮助我什么可能抛出的错误?
先谢了。
我尝试过将这些jar添加到pyspark启动包中

  • spark-catalyst_2.12-3.3.2.jar
  • scala-library-2.10.0-M3.jar

架构工作正常。

>>> df.printSchema()
    root
     |-- _id: string (nullable = true)
     |-- x: integer (nullable = true)

X列的值为1..20,应显示数据。

eh57zj3b

eh57zj3b1#

这个错误是由于你在pyspark和mongo-spark-connector上使用了错误的scala版本。
首先,你卸载所有的东西,并开始安装pyspark,使用:

pip install pyspark

请按照official document进行操作。
那就跑

pyspark --version

检查你使用的是哪个版本的spark和scala。
对我来说它的Spark:3.3.2Scala:2.12.15.现在使用此信息选择依赖库。
在这种情况下,我将用途:

pyspark --packages org.mongodb.spark:mongo-spark-connector_2.12:10.1.1

对于本教程的其余部分,您可以参考this或任何其他教程。

相关问题