pyspark自定义接收器使用spark流读取mongo变更流日志

30byixjq  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(248)

我想使用spark streaming从mongodb change streams读取数据[link at the end]。
这里要收集30秒转储,然后推入一些文件。我知道我可能需要编写一些定制接收器(使用pyspark)来接收来自相关数据源的数据,但是我找不到任何讨论使用python的spark流的定制接收器的文档。
下面的文档链接还提到了使用java或scala。
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
我使用简单的python代码从changestreams读取数据,但它不能满足我的要求。
注意:在下面的代码中 change_stream 一个接一个地使用for循环(而是希望批量读取30秒时间范围内的文档,然后将其写入某个目标文件)

  1. import os
  2. import pymongo
  3. from bson.json_util import dumps
  4. STREAM_DB="mongodb://<username>:<pwd>@<host>:<port>/<database to be used> authSource=admin&retryWrites=true"
  5. client = pymongo.MongoClient(STREAM_DB)
  6. change_stream = client.<database name>.watch()
  7. print(change_stream)
  8. f = open("<filename>", "a")
  9. for change in change_stream:
  10. f.write(dumps(change) + '\n')
  11. f.close()

我没有看到任何文档讨论使用python的spark流的定制接收器。下面的文档链接还提到了使用java或scala。
http://spark.apache.org/docs/latest/streaming-custom-receivers.html
有没有一种方法可以使用spark读取流式mongodb变更流数据。
[1] : https://docs.mongodb.com/manual/changestreams/

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题