我编写了这个简单的脚本,它连接到mongodb(3.6.2)服务器并读取名为“datascience.nypd”的集合。数据集相当大(约700万个文档)。
我在hadoop和pymongo\u spark之上使用pyspark作为pyspark和mongodb之间的接口。如果我尝试读取另一个集合中的第一个条目(包含一些文档),我可以毫无问题地读取它,但是尝试访问主数据集会导致以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: com.mongodb.hadoop.splitter.SplitFailedException: Failed to aggregate sample documents. Note that this Splitter implementation is incompatible with MongoDB versions prior to 3.2.
at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:62)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1402)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:373)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.hadoop.splitter.SplitFailedException: Failed to aggregate sample documents. Note that this Splitter implementation is incompatible with MongoDB versions prior to 3.2.
at com.mongodb.hadoop.splitter.SampleSplitter.calculateSplits(SampleSplitter.java:84)
at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:60)
... 27 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 9: 'The 'cursor' option is required, except for aggregate with the explain argument' on server localhost:27017. The full response is { "ok" : 0.0, "errmsg" : "The 'cursor' option is required, except for aggregate with the explain argument", "code" : 9, "codeName" : "FailedToParse" }
at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:86)
at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:120)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:173)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:215)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:206)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:112)
at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:227)
at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:223)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:239)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:212)
at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:223)
at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:65)
at com.mongodb.Mongo.execute(Mongo.java:772)
at com.mongodb.Mongo$2.execute(Mongo.java:759)
at com.mongodb.DBCollection.aggregate(DBCollection.java:1377)
at com.mongodb.DBCollection.aggregate(DBCollection.java:1308)
at com.mongodb.DBCollection.aggregate(DBCollection.java:1294)
at com.mongodb.hadoop.splitter.SampleSplitter.calculateSplits(SampleSplitter.java:82)
... 28 more
这是密码。
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, StringType
import pymongo_spark
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName('pyspark test')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
mongo_rdd = sc.mongoRDD('mongodb://localhost:27017/datascience.nypd') sqlcontext = SQLContext(sc)
schema = StructType([StructField(ID, StringType()),
StructField(DATE, StringType()),
StructField(TIME, StringType()),
StructField(PRECINCT, StringType()),
StructField(OFFENSE_CODE, StringType()),
StructField(OFFENSE_DESCRIPTION, StringType()),
StructField(CRIME_OUTCOME, StringType()),
StructField(LEVEL_OFFENSE, StringType()),
StructField(BOROUGH, StringType()),
StructField(LATITUDE, StringType()),
StructField(LONGITUDE, StringType()),
StructField(AGE, StringType()),
StructField(RACE, StringType()),
StructField(SEX, StructType())
])
projected_rdd = mongo_rdd.map(project)
train_df = sqlcontext.read.json(projected_rdd)
print(train_df.first())
sc.stop()
暂无答案!
目前还没有任何答案,快来回答吧!