更好的解决方案,可以为sparkDataframe使用任何消息代理

nukf8bse  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(270)

我正在运行一个算法来标记mongo字段,并在此基础上向该文档添加新字段。由于我的收藏数量在100万左右,因此更新和插入要花很多时间。
样本数据:

{id:'a1',content:'some text1'}
    {id:'a2',content:'some text2'}

python代码:

docs= db.col.find({})
 for doc in docs:
     out = do_operation(doc['content']) //do_operation is my algorithm
     doc["tag"]=out
     db.col.update(id:doc['id'],$set:{'Tag_flag':TRUE})
     db.col2.insert(doc)

虽然我使用了sparkDataframe来提高速度,但是sparkDataframe占用了大量内存并抛出内存错误(配置:4核和16gb ram(在hadoop的单个集群上)

df = //loading mongodata to a dataframe
 df1 = df.withColumn('tag',df.content)
 output = []     
 for doc in df.rdd.collect():
    out = do_operation(doc['content'])
    output.append(out)  
 df2 = spark.createDataFrame(output)
 final_df = df1.join(df2, df1._id == df2._id , 'inner')
 //and finally inserting this dataframe into new collection.

我需要优化我的sparkcode,这样我可以用更少的内存加速。
我可以在mongo和spark之间使用任何消息代理,比如kafka、rabbitmq或reddis。
会有帮助吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题