我在org.bson.basicBondeCoder中得到“java.lang.illegalstateexception:not ready”。在尝试使用mongodb作为输入rdd时解码:
Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");
JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);
System.out.println(rdd.count());
我得到的例外是:14/08/06 09:49:57 info rdd.newhadooprdd:input split:
MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: not ready
at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:618)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
at java.lang.Thread.run(Thread.java:804)
所有的程序输出都在这里
环境:
红帽
Spark1.0.1
hadoop 2.4.1版
mongodb 2.4.10版本
mongo-hadoop-1.3版本
3条答案
按热度按时间2nbm6dog1#
我发现了同样的问题。作为一种解决方法,我放弃了newapihadooprdd方法,实现了一种并行加载机制,它基于在文档id上定义间隔,然后并行加载每个分区。想法是通过使用mongodb java驱动程序实现以下mongo shell代码:
现在我们可以使用范围对集合片段执行快速查询。注意,最后一个片段需要区别对待,仅作为min约束,以避免丢失集合的最后一个文档。
我将其实现为一个简单范围pojo的java方法“linkedlist computeidranges(dbcollection coll,int rangesize)”,然后并行化集合并用flatmaptopair对其进行转换,以生成一个类似于newapihadooprdd返回的rdd。
您可以使用范围的大小和用于并行化的片的数量来控制并行的粒度。
我希望这有帮助,
问候语!
胡安·罗德í圭兹水平á
watbbzwu2#
我想我已经发现了这个问题:mongodb hadoop在core/src/main/java/com/mongodb/hadoop/input/mongoinputsplit.java中的bson编码器/解码器示例上有一个“静态”修饰符。当spark以多线程模式运行时,所有线程都会尝试使用相同的编码器/解码器示例进行反序列化,这显然会产生不好的结果。
我的github上的补丁(已经向上游提交了pull请求)
我现在可以从python运行8核多线程spark->mongo collection count()!
ewm0tg9j3#
在使用mongorestore导入bson文件之后,我遇到了相同的异常组合。调用db.collecion.reindex()为我解决了这个问题。