mongodb mapreduce-如何在reduce函数中填充数组?

9rygscc1  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(249)

我有一个有列的电影数据库 userId , movieId , movie-categoryId , reviewId , movieRating 以及 reviewDate .
在我的Map器中,我想提取userid->(movieid,movierating)
然后在reducer中,我想把所有的movieid,movierating对按用户分组。
以下是我的尝试:
Map功能:

var map = function() {
    var values={movieId : this.movieId, movieRating : this.movieRating};
    emit(this.userId, values);}

减少功能:

var reduce = function(key,values) {
    var ratings = [];
    values.forEach(function(V){
        var temp = {movieId : V.movieId, movieRating : V.movieRating};
        Array.prototype.push.apply(ratings, temp);
        });
    return {userId : key, ratings : ratings };
}

运行mapreduce:

db.ratings.mapReduce(map, reduce, { out: "map_reduce_step1" })

输出: db.map_reduce_step1.find() ```
{ "_id" : 1, "value" : { "userId" : 1, "ratings" : [ ] } }
{ "_id" : 2, "value" : { "userId" : 2, "ratings" : [ ] } }
{ "_id" : 3, "value" : { "userId" : 3, "ratings" : [ ] } }
{ "_id" : 4, "value" : { "userId" : 4, "ratings" : [ ] } }
{ "_id" : 5, "value" : { "userId" : 5, "ratings" : [ ] } }
{ "_id" : 6, "value" : { "userId" : 6, "ratings" : [ ] } }
{ "_id" : 7, "value" : { "userId" : 7, "ratings" : [ ] } }
{ "_id" : 8, "value" : { "userId" : 8, "ratings" : [ ] } }
{ "_id" : 9, "value" : { "userId" : 9, "ratings" : [ ] } }
{ "_id" : 10, "value" : { "userId" : 10, "ratings" : [ ] } }
{ "_id" : 11, "value" : { "userId" : 11, "ratings" : [ ] } }
{ "_id" : 12, "value" : { "userId" : 12, "ratings" : [ ] } }
{ "_id" : 13, "value" : { "userId" : 13, "ratings" : [ ] } }
{ "_id" : 14, "value" : { "userId" : 14, "ratings" : [ ] } }
{ "_id" : 15, "value" : { "movieId" : 1, "movieRating" : 3 } }
{ "_id" : 16, "value" : { "userId" : 16, "ratings" : [ ] } }

我没有得到预期的产出。事实上,这个输出对我来说毫无意义!
下面是我在reducer中尝试做的python等价物(以防上面没有明确reducer的用途):

def reducer_ratings_by_user(self, user_id, itemRatings):
#Group (item, rating) pairs by userID
ratings = []
for movieID, rating in itemRatings:
ratings.append((movieID, rating))
yield user_id, ratings

编辑1@chridam
以下是我在这里真正想做的事情的概要:
movies.csv文件如下所示:
userid,movieid,moviecategoryid,reviewid,movierating,reviewdate
1,1,1,1,5,7/12/2000
2,1,1,2,5,7/12/2000
3,1,1,3,5,7/12/2000
4,1,1,4,4,7/12/2000
5,1,1,5,4,7/12/2000
6,1,1,6,5,7/15/2000
1,2,1,7,4,7/25/2000
8,1,1,8,4,7/28/2000
9,1,1,9,3,8/3/2000
...
...
我将其导入mongodb:

mongoimport --db SomeName --collection ratings --type csv --headerline --file Movies.csv

然后我尝试应用上面定义的map reduce函数。之后,我将通过执行以下操作将其导出回csv:

mongoexport --db SomeName --collection map_reduce_step1 --csv --out movie_ratings_out.csv --fields ...

这个 `movie_ratings_out.csv` 文件应如下所示:
userid,movieid1,rating1,movieid2,rating2,。。。
1,1,5,2,4
...
...
因此,每一行都包含每个用户的所有(电影、分级)对。
编辑2
样品:

db.ratings.find().pretty()
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a40f"),
"userId" : 4,
"movieId" : 1,
"movie-categoryId" : 1,
"reviewId" : 4,
"movieRating" : 4,
"reviewDate" : "7/12/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a410"),
"userId" : 5,
"movieId" : 1,
"movie-categoryId" : 1,
"reviewId" : 5,
"movieRating" : 4,
"reviewDate" : "7/12/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a411"),
"userId" : 4,
"movieId" : 2,
"movie-categoryId" : 1,
"reviewId" : 6,
"movieRating" : 5,
"reviewDate" : "7/15/2000"
}
{
"_id" : ObjectId("57f4a0dd9cb74fc4d344a412"),
"userId" : 4,
"movieId" : 3,
"movie-categoryId" : 1,
"reviewId" : 2,
"movieRating" : 5,
"reviewDate" : "7/12/2000"
}
...

在mapreduce之后,预期的输出json是:

{
"_id" : ....,
"userId" : 4,
"movieList" : [ {
"movieId" : 2
"movieRating" : 5
},
{
"movieId" : 1
"movieRating" : 4
}
...
]
}
{
"_id" : ....,
"userId" : 5,
"movieList" : ...
}
...

lsmd5eda

lsmd5eda1#

您只需要运行一个聚合管道,它由 $group 总结文档的阶段。这将按指定的标识符表达式对输入文档进行分组,并应用累加器表达式。这个 $group 管道操作符类似于sql GROUP BY 条款。在sql中,不能使用 GROUP BY 除非使用任何聚合函数。同样,您也必须在mongodb中使用聚合函数。您可以在这里阅读有关聚合函数的更多信息。
需要创建 movieList 数组为 $push .
之后的另一条管道 $group 舞台就是舞台 $project 运算符,用于选择或重塑流中的每个文档,包括、排除或重命名字段,注入计算字段,创建子文档字段,使用数学表达式、日期、字符串和/或逻辑(比较、布尔、控制)表达式-类似于sql SELECT 条款。
最后一步是 $out 将聚合管道的结果文档写入集合的管道。这一定是管道的最后一个阶段。
因此,您可以运行以下聚合操作:

db.ratings.aggregate([
    {
        "$group": {
            "_id": "$userId",
            "movieList": {
                "$push": {
                    "movieId": "$movieId",
                    "movieRating": "$movieRating",
                }
            }
        }
    },
    {
        "$project": {
            "_id": 0, "userId": "$_id", "movieList": 1
        }
    },
    { "$out": "movie_ratings_out" }
])

使用上面的示例5文档,如果您查询 db.getCollection('movie_ratings_out').find({}) 将产生:

/* 1 */
{
    "_id" : ObjectId("57f52636b9c3ea346ab1d399"),
    "movieList" : [ 
        {
            "movieId" : 1.0,
            "movieRating" : 4.0
        }
    ],
    "userId" : 5.0
}

/* 2 */
{
    "_id" : ObjectId("57f52636b9c3ea346ab1d39a"),
    "movieList" : [ 
        {
            "movieId" : 1.0,
            "movieRating" : 4.0
        }, 
        {
            "movieId" : 2.0,
            "movieRating" : 5.0
        }, 
        {
            "movieId" : 3.0,
            "movieRating" : 5.0
        }
    ],
    "userId" : 4.0
}

相关问题