让spark、python和mongodb协同工作

y53ybaqx  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(449)

我很难把这些部件正确地连接起来。我有Spark安装和工作成功,我可以运行作业本地,独立,也可以通过Yarn。我(据我所知)在这里和这里遵循了建议的步骤
我正在开发ubuntu,我拥有的各种组件版本
spark-1.5.1-bin-hadoop2.6
hadoop hadoop-2.6.1版本
蒙哥2.6.10
mongo hadoop连接器克隆自https://github.com/mongodb/mongo-hadoop.git
python 2.7.10版
我在执行各种步骤时遇到了一些困难,例如要将哪些jar添加到哪个路径,所以我添加的是
/usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce 我补充说 mongo-hadoop-core-1.5.0-SNAPSHOT.jar 以下环境变量 export HADOOP_HOME="/usr/local/share/hadoop-2.6.1" export PATH=$PATH:$HADOOP_HOME/bin export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6" export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python" export PATH=$PATH:$SPARK_HOME/bin 我的python程序是basic

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

我正在用命令运行它

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

我得到了以下结果

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

根据这里
当java客户机代码中发生异常时,会引发此异常。例如,如果您尝试从空堆栈中弹出一个元素。抛出的java异常示例存储在java\u异常成员中。
查看源代码 pymongo_spark.py 它说,这条线抛出了错误
“与jvm通信时出错。mongodb spark jar在spark的类路径上吗?“
因此,作为回应,我试图确保正确的jar被传递,但我可能会做错这一切,见下文

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

我已经进口了 pymongo 到同一个python程序来验证我至少可以使用它访问mongodb,而且我可以。
我知道这里有很多活动部件,所以如果我能提供更多有用的信息请告诉我。

mqkwyuun

mqkwyuun1#

你能试着用吗 --package 选项而不是 --jars ... 在spark submit命令中:

spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]

其中一些jar文件不是uberjar,需要下载更多依赖项才能开始工作。

wd2eg0qa

wd2eg0qa2#

更新:
2016-07-04
自从上次更新mongodb spark connector以来,它已经成熟了很多。它提供了最新的二进制文件和基于数据源的api,但它使用的是 SparkConf 因此它在主观上不如stratio/spark mongodb灵活。
2016-03-30
从最初的答案开始,我找到了两种不同的方法从spark连接到mongodb:
mongodb/mongo spark公司
stratio/spark mongodb公司
虽然前者看起来相对不成熟,但后者看起来比mongo-hadoop连接器更好,并提供了sparksqlapi。


# Adjust Scala and package version according to your setup

# although officially 0.11 supports only Spark 1.5

# I haven't encountered any issues on 1.6.1

bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+

## |  x|   y|                 _id|

## +---+----+--------------------+

## |1.0|-1.0|56fbe6f6e4120712c...|

## |0.0| 4.0|56fbe701e4120712c...|

## +---+----+--------------------+

似乎比以前稳定多了 mongo-hadoop-spark ,支持 predicate 下推,无需静态配置,工作简单。
最初的答案是:
事实上,这里有相当多的运动部件。我试图通过构建一个简单的docker映像使其更易于管理,该映像与所描述的配置大致匹配(不过为了简洁起见,我省略了hadoop库)。你可以在上找到完整的源代码 GitHub (doi 10.5281/zenodo.47882)从头开始:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

或者下载一张我推送到docker hub的图片,这样你就可以 docker pull zero323/mongo-spark ):
开始图像:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

开始Spark壳传递 --jars 以及 --driver-class-path :

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

最后看看它是如何工作的:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

请注意,mongohadoop似乎在第一次操作之后关闭了连接。比如说打电话 rdd.count() collect之后将引发异常。
基于我在创作这张照片时遇到的不同问题,我倾向于相信 mongo-hadoop-1.5.0-SNAPSHOT.jar 以及 mongo-hadoop-spark-1.5.0-SNAPSHOT.jar 对双方 --jars 以及 --driver-class-path 是唯一的硬性要求。
笔记:
这张图片大致基于jaceklaskowski/docker spark,所以请务必发送一些好的因果报应给@jaceklaskowski,如果它有帮助的话。
如果不需要包含新api的开发版本,则使用 --packages 很可能是更好的选择。

nszi6y05

nszi6y053#

我昨天也有同样的问题。通过放置 mongo-java-driver.jar$HADOOP_HOME/lib 以及 mongo-hadoop-core.jar 以及 mongo-hadoop-spark.jar$HADOOP_HOME/spark/classpath/emr (或 $SPARK_CLASSPATH ).
如果有帮助请告诉我。

zlhcx6iw

zlhcx6iw4#

祝你好运!
@看到了吗https://github.com/mongodb/mongo-hadoop/wiki/spark-usage

from pyspark import SparkContext, SparkConf

import pymongo_spark

# Important: activate pymongo_spark.

pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)

    # Create an RDD backed by the MongoDB collection.
    # This RDD *does not* contain key/value pairs, just documents.
    # If you want key/value pairs, use the mongoPairRDD method instead.
    rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')

    # Save this RDD back to MongoDB as a different collection.
    rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')

    # You can also read and write BSON:
    bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
    bson_rdd.saveToBSON('/path/to/bson/output')

if __name__ == '__main__':
    main()

相关问题