我有一个有列的电影数据库 userId
, movieId
, movie-categoryId
, reviewId
, movieRating
以及 reviewDate
.
在我的Map器中,我想提取userid->(movieid,movierating)
然后在reducer中,我想把所有的movieid,movierating对按用户分组。
以下是我的尝试:
Map功能:
var map = function() {
var values={movieId : this.movieId, movieRating : this.movieRating};
emit(this.userId, values);}
减少功能:
var reduce = function(key,values) {
var ratings = [];
values.forEach(function(V){
var temp = {movieId : V.movieId, movieRating : V.movieRating};
Array.prototype.push.apply(ratings, temp);
});
return {userId : key, ratings : ratings };
}
运行mapreduce:
db.ratings.mapReduce(map, reduce, { out: "map_reduce_step1" })
输出: db.map_reduce_step1.find()
```
{ "_id" : 1, "value" : { "userId" : 1, "ratings" : [ ] } }
{ "_id" : 2, "value" : { "userId" : 2, "ratings" : [ ] } }
{ "_id" : 3, "value" : { "userId" : 3, "ratings" : [ ] } }
{ "_id" : 4, "value" : { "userId" : 4, "ratings" : [ ] } }
{ "_id" : 5, "value" : { "userId" : 5, "ratings" : [ ] } }
{ "_id" : 6, "value" : { "userId" : 6, "ratings" : [ ] } }
{ "_id" : 7, "value" : { "userId" : 7, "ratings" : [ ] } }
{ "_id" : 8, "value" : { "userId" : 8, "ratings" : [ ] } }
{ "_id" : 9, "value" : { "userId" : 9, "ratings" : [ ] } }
{ "_id" : 10, "value" : { "userId" : 10, "ratings" : [ ] } }
{ "_id" : 11, "value" : { "userId" : 11, "ratings" : [ ] } }
{ "_id" : 12, "value" : { "userId" : 12, "ratings" : [ ] } }
{ "_id" : 13, "value" : { "userId" : 13, "ratings" : [ ] } }
{ "_id" : 14, "value" : { "userId" : 14, "ratings" : [ ] } }
{ "_id" : 15, "value" : { "movieId" : 1, "movieRating" : 3 } }
{ "_id" : 16, "value" : { "userId" : 16, "ratings" : [ ] } }
我没有得到预期的产出。事实上,这个输出对我来说毫无意义!
下面是我在reducer中尝试做的python等价物(以防上面没有明确reducer的用途):
def reducer_ratings_by_user(self, user_id, itemRatings):
#Group (item, rating) pairs by userID
ratings = []
for movieID, rating in itemRatings:
ratings.append((movieID, rating))
yield user_id, ratings
编辑1@chridam
以下是我在这里真正想做的事情的概要:
movies.csv文件如下所示:
userid,movieid,moviecategoryid,reviewid,movierating,reviewdate
1,1,1,1,5,7/12/2000
2,1,1,2,5,7/12/2000
3,1,1,3,5,7/12/2000
4,1,1,4,4,7/12/2000
5,1,1,5,4,7/12/2000
6,1,1,6,5,7/15/2000
1,2,1,7,4,7/25/2000
8,1,1,8,4,7/28/2000
9,1,1,9,3,8/3/2000
...
...
我将其导入mongodb:
mongoimport --db SomeName --collection ratings --type csv --headerline --file Movies.csv
然后我尝试应用上面定义的map reduce函数。之后,我将通过执行以下操作将其导出回csv:
mongoexport --db SomeName --collection map_reduce_step1 --csv --out movie_ratings_out.csv --fields ...
这个 `movie_ratings_out.csv` 文件应如下所示:
userid,movieid1,rating1,movieid2,rating2,。。。
1,1,5,2,4
...
...
因此,每一行都包含每个用户的所有(电影、分级)对。
编辑2
样品:
db.ratings.find().pretty()
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a40f"),
"userId" : 4,
"movieId" : 1,
"movie-categoryId" : 1,
"reviewId" : 4,
"movieRating" : 4,
"reviewDate" : "7/12/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a410"),
"userId" : 5,
"movieId" : 1,
"movie-categoryId" : 1,
"reviewId" : 5,
"movieRating" : 4,
"reviewDate" : "7/12/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a411"),
"userId" : 4,
"movieId" : 2,
"movie-categoryId" : 1,
"reviewId" : 6,
"movieRating" : 5,
"reviewDate" : "7/15/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a412"),
"userId" : 4,
"movieId" : 3,
"movie-categoryId" : 1,
"reviewId" : 2,
"movieRating" : 5,
"reviewDate" : "7/12/2000"
}
...
在mapreduce之后,预期的输出json是:
{
"_id" : ....,
"userId" : 4,
"movieList" : [ {
"movieId" : 2
"movieRating" : 5
},
{
"movieId" : 1
"movieRating" : 4
}
...
]
}
{
"_id" : ....,
"userId" : 5,
"movieList" : ...
}
...
1条答案
按热度按时间lsmd5eda1#
您只需要运行一个聚合管道,它由
$group
总结文档的阶段。这将按指定的标识符表达式对输入文档进行分组,并应用累加器表达式。这个$group
管道操作符类似于sqlGROUP BY
条款。在sql中,不能使用GROUP BY
除非使用任何聚合函数。同样,您也必须在mongodb中使用聚合函数。您可以在这里阅读有关聚合函数的更多信息。需要创建
movieList
数组为$push
.之后的另一条管道
$group
舞台就是舞台$project
运算符,用于选择或重塑流中的每个文档,包括、排除或重命名字段,注入计算字段,创建子文档字段,使用数学表达式、日期、字符串和/或逻辑(比较、布尔、控制)表达式-类似于sqlSELECT
条款。最后一步是
$out
将聚合管道的结果文档写入集合的管道。这一定是管道的最后一个阶段。因此,您可以运行以下聚合操作:
使用上面的示例5文档,如果您查询
db.getCollection('movie_ratings_out').find({})
将产生: