pyspark无法正确读取mongodb

khbbv19g  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(474)

我编写了这个简单的脚本,它连接到mongodb(3.6.2)服务器并读取名为“datascience.nypd”的集合。数据集相当大(约700万个文档)。
我在hadoop和pymongo\u spark之上使用pyspark作为pyspark和mongodb之间的接口。如果我尝试读取另一个集合中的第一个条目(包含一些文档),我可以毫无问题地读取它,但是尝试访问主数据集会导致以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: com.mongodb.hadoop.splitter.SplitFailedException: Failed to aggregate sample documents. Note that this Splitter implementation is incompatible with MongoDB versions prior to 3.2.
        at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:62)
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
        at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1402)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
        at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
        at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:373)
        at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.hadoop.splitter.SplitFailedException: Failed to aggregate sample documents. Note that this Splitter implementation is incompatible with MongoDB versions prior to 3.2.
        at com.mongodb.hadoop.splitter.SampleSplitter.calculateSplits(SampleSplitter.java:84)
        at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:60)
        ... 27 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 9: 'The 'cursor' option is required, except for aggregate with the explain argument' on server localhost:27017. The full response is { "ok" : 0.0, "errmsg" : "The 'cursor' option is required, except for aggregate with the explain argument", "code" : 9, "codeName" : "FailedToParse" }
        at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:86)
        at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:120)
        at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
        at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
        at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:173)
        at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:215)
        at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:206)
        at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:112)
        at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:227)
        at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:223)
        at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:239)
        at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:212)
        at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:223)
        at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:65)
        at com.mongodb.Mongo.execute(Mongo.java:772)
        at com.mongodb.Mongo$2.execute(Mongo.java:759)
        at com.mongodb.DBCollection.aggregate(DBCollection.java:1377)
        at com.mongodb.DBCollection.aggregate(DBCollection.java:1308)
        at com.mongodb.DBCollection.aggregate(DBCollection.java:1294)
        at com.mongodb.hadoop.splitter.SampleSplitter.calculateSplits(SampleSplitter.java:82)
        ... 28 more

这是密码。

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark
pymongo_spark.activate()

def main():
conf = SparkConf().setAppName('pyspark test')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mongo_rdd = sc.mongoRDD('mongodb://localhost:27017/datascience.nypd')                                                                                                                                                                                                                                                                                                                                                                 sqlcontext = SQLContext(sc)
schema = StructType([StructField(ID, StringType()),
                     StructField(DATE, StringType()),
                     StructField(TIME, StringType()),
                     StructField(PRECINCT, StringType()),
                     StructField(OFFENSE_CODE, StringType()),
                     StructField(OFFENSE_DESCRIPTION, StringType()),
                     StructField(CRIME_OUTCOME, StringType()),
                     StructField(LEVEL_OFFENSE, StringType()),
                     StructField(BOROUGH, StringType()),
                     StructField(LATITUDE, StringType()),
                     StructField(LONGITUDE, StringType()),
                     StructField(AGE, StringType()),
                     StructField(RACE, StringType()),
                     StructField(SEX, StructType())
                     ])

projected_rdd = mongo_rdd.map(project)
train_df = sqlcontext.read.json(projected_rdd)
print(train_df.first())
sc.stop()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题