使用hadoop reducer在mongo中执行bulkwriteoperation时检查重复记录

i2byvkas  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(389)

我正在使用hadoop map reduce处理xml文件。我直接将json数据存储到mongodb中。
如何实现在执行前只将不重复的记录存储到数据库中 BulkWriteOperation ?
重复记录标准将基于产品图像和产品名称,我不想使用morphia层,在这里我们可以为类成员分配索引。
这是我的课程:

public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{

private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);    

protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
    LOGGER.info("reduce()------Start for key>"+key);
    Map<String,String> insertProductInfo = new HashMap<String,String>();
    try{
        MongoClient mongoClient = new MongoClient("localhost", 27017);
        DB db = mongoClient.getDB("test");
        BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
        for (MapWritable entry : values) {
             for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
                    insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
                }
             if(!insertProductInfo.isEmpty()){
                 BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
                 operation.insert(basicDBObject);
             }          
        }
        //How can I check for duplicates before executing bulk operation
        operation.execute();
        LOGGER.info("reduce------end for key"+key);
    }catch(Exception e){
        LOGGER.error("General Exception in XMLReducer",e);
    }
  } 
}

编辑:在建议的答案之后,我添加了:

BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
                 .append("product_name", basicDBObject.get("product_name"));
                 operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
 operation.insert(basicDBObject);

我犯了这样的错误: com.mongodb.MongoInternalException: no mapping found for index 0 任何帮助都会有用的。谢谢。

3bygqnnd

3bygqnnd1#

我想这一切都取决于你真正想对这里的“复制品”做什么,以及你如何处理它。
一个你可以随时使用的 .initializeUnOrderedBulkOperation() 它不会对索引中的重复键“出错”(您需要停止重复键),但会在返回的 BulkWriteResult 对象。它是从 .execute() ```
BulkWriteResult result = operation.execute();

另一方面,您可以只使用“upserts”来代替,并使用诸如 `$setOnInsert` 仅在不存在重复项的情况下进行更改:

BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));

operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));

因此,您基本上是通过查询查找保存“key”的字段的值来确定是否存在重复项,然后只实际更改未找到该“key”的任何数据,从而创建一个新文档并“插入”。
在任何一种情况下,这里的默认行为都是“插入”第一个唯一的“key”值,然后忽略所有其他事件。如果要在找到相同键的位置执行其他操作,如“覆盖”或“增量”值,则 `.update()` “upsert”方法是您想要的方法,但是您将使用其他update操作符来执行这些操作。

相关问题