我正在使用hadoop map reduce处理xml文件。我直接将json数据存储到mongodb中。
如何实现在执行前只将不重复的记录存储到数据库中 BulkWriteOperation
?
重复记录标准将基于产品图像和产品名称,我不想使用morphia层,在这里我们可以为类成员分配索引。
这是我的课程:
public class XMLReducer extends Reducer<Text, MapWritable, Text, NullWritable>{
private static final Logger LOGGER = Logger.getLogger(XMLReducer.class);
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) throws IOException, InterruptedException{
LOGGER.info("reduce()------Start for key>"+key);
Map<String,String> insertProductInfo = new HashMap<String,String>();
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB("test");
BulkWriteOperation operation = db.getCollection("product").initializeOrderedBulkOperation();
for (MapWritable entry : values) {
for (Entry<Writable, Writable> extractProductInfo : entry.entrySet()) {
insertProductInfo.put(extractProductInfo.getKey().toString(), extractProductInfo.getValue().toString());
}
if(!insertProductInfo.isEmpty()){
BasicDBObject basicDBObject = new BasicDBObject(insertProductInfo);
operation.insert(basicDBObject);
}
}
//How can I check for duplicates before executing bulk operation
operation.execute();
LOGGER.info("reduce------end for key"+key);
}catch(Exception e){
LOGGER.error("General Exception in XMLReducer",e);
}
}
}
编辑:在建议的答案之后,我添加了:
BasicDBObject query = new BasicDBObject("product_image", basicDBObject.get("product_image"))
.append("product_name", basicDBObject.get("product_name"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicDBObject));
operation.insert(basicDBObject);
我犯了这样的错误: com.mongodb.MongoInternalException: no mapping found for index 0
任何帮助都会有用的。谢谢。
1条答案
按热度按时间3bygqnnd1#
我想这一切都取决于你真正想对这里的“复制品”做什么,以及你如何处理它。
一个你可以随时使用的
.initializeUnOrderedBulkOperation()
它不会对索引中的重复键“出错”(您需要停止重复键),但会在返回的BulkWriteResult
对象。它是从.execute()
```BulkWriteResult result = operation.execute();
BasicDBObject basicdbobject = new BasicDBObject(insertProductInfo);
BasicDBObject query = new BasicDBObject("key", basicdbobject.get("key"));
operation.find(query).upsert().updateOne(new BasicDBObject("$setOnInsert", basicdbobject));