我想使用spark streaming从mongodb change streams读取数据[link at the end]。
这里要收集30秒转储,然后推入一些文件。我知道我可能需要编写一些定制接收器(使用pyspark)来接收来自相关数据源的数据,但是我找不到任何讨论使用python的spark流的定制接收器的文档。
下面的文档链接还提到了使用java或scala。
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
我使用简单的python代码从changestreams读取数据,但它不能满足我的要求。
注意:在下面的代码中 change_stream
一个接一个地使用for循环(而是希望批量读取30秒时间范围内的文档,然后将其写入某个目标文件)
import os
import pymongo
from bson.json_util import dumps
STREAM_DB="mongodb://<username>:<pwd>@<host>:<port>/<database to be used> authSource=admin&retryWrites=true"
client = pymongo.MongoClient(STREAM_DB)
change_stream = client.<database name>.watch()
print(change_stream)
f = open("<filename>", "a")
for change in change_stream:
f.write(dumps(change) + '\n')
f.close()
我没有看到任何文档讨论使用python的spark流的定制接收器。下面的文档链接还提到了使用java或scala。
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
有没有一种方法可以使用spark读取流式mongodb变更流数据。
[1] : https://docs.mongodb.com/manual/changestreams/
暂无答案!
目前还没有任何答案,快来回答吧!