我基本上是通过在hadoop上扩展来实现一个推荐系统。
在第一步中,我尝试计算输入文件中每对项之间的相似性
{项目a,项目b,相似性}
输出文件大小变得非常大(对于60kb的输入,我得到的输出文件大小是6mb)。
因此,我想是否最好将结果存储在python dict中,并在整个map reduce程序结束后只打印一次dict。我没有成功,请帮助我。
我的python代码是:
# !/usr/bin/env python
from mrjob.job import MRJob
from math import sqrt
from itertools import combinations
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0
prefs={}
def correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared):
'''
The correlation between two vectors A, B is
[n * dotProduct(A, B) - sum(A) * sum(B)] /
sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
'''
numerator = size * dot_product - rating_sum * rating2sum
denominator = sqrt(size * rating_norm_squared - rating_sum * rating_sum) * \
sqrt(size * rating2_norm_squared - rating2sum * rating2sum)
return (numerator / (float(denominator))) if denominator else 0.0
def regularized_correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared,
virtual_cont, prior_correlation):
'''
The Regularized Correlation between two vectors A, B
RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
where w = # actualPairs / (# actualPairs + # virtualPairs).
'''
unregularizedCorrelation = correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared)
w = size / float(size + virtual_cont)
return w * unregularizedCorrelation + (1.0 - w) * prior_correlation
class SemicolonValueProtocol(object):
# don't need to implement read() since we aren't using it
def write(self, key, values):
return ';'.join(str(v) for v in values)
class BooksSimilarities(MRJob):
# OUTPUT_PROTOCOL = SemicolonValueProtocol
def steps(self):
return [
self.mr(mapper=self.group_by_user_rating,
reducer=self.count_ratings_users_freq),
self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity),
self.mr(mapper=self.calculate_ranking,
reducer=self.top_similar_items)]
def group_by_user_rating(self, key, line):
'''
Emit the user_id and group by their ratings (item and rating)
17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2
'''
line=line.replace("\"","");
user_id, item_id, rating = line.split(',')
yield user_id, (item_id, float(rating))
def count_ratings_users_freq(self, user_id, values):
'''
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
'''
item_count = 0
item_sum = 0
final = []
for item_id, rating in values:
item_count += 1
item_sum += rating
final.append((item_id, rating))
yield user_id, (item_count, item_sum, final)
def pairwise_items(self, user_id, values):
'''
The output drops the user from the key entirely, instead it emits
the pair of items as the key:
19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2
'''
item_count, item_sum, ratings = values
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), (item1[1], item2[1])
def calculate_similarity(self, pair_key, lines):
'''
Sum components of each corating pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
corating counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.
19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y in lines:
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
sum_xx += item_x * item_x
sum_yy += item_y * item_y
n += 1
reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy, PRIOR_COUNT, PRIOR_CORRELATION)
yield (item_xname, item_yname), (reg_corr_sim, n)
def calculate_ranking(self, item_keys, values):
'''
Emit items with similarity in key for ranking:
19,0.4 70,1
19,0.6 21,2
21,0.6 19,2
21,0.9 70,1
70,0.4 19,1
70,0.9 21,1
'''
reg_corr_sim, n = values
item_x, item_y = item_keys
if int(n) > 0:
yield (item_x, reg_corr_sim),(item_y, n)
def top_similar_items(self, key_sim, similar_ns):
'''
For each item emit K closest items in comma separated file:
De La Soul;A Tribe Called Quest;0.6;1
De La Soul;2Pac;0.4;2
'''
item_x, reg_corr_sim = key_sim
for item_y, n in similar_ns:
#yield None, (item_x, item_y, reg_corr_sim, n)
prefs.setdefault(item_x,{})
prefs[item_x][item_y] = float(reg_corr_sim)
prefs.setdefault(item_y,{})
prefs[item_y][item_x] = float(reg_corr_sim)
print "exiting"
if __name__ == '__main__':
BooksSimilarities.run()
所以我执行后想要什么
python thisfile.py<input.csv-r hadoop>output.txt
是一个相对较小的输出文件,没有重复和一个dict。
总之,
目前这个程序打印退出n次,但我想它只打印一次。
除此之外,还有什么更好的方法可以通过在hadoop上以更好的方式扩展来实现协作过滤。
提前多谢了。
1条答案
按热度按时间c7rzv4ha1#
你只能保证用同一把钥匙会去同一个减速机。因此,如果您在集群上运行多个reducer,那么工作是分开的,当reducer运行以完成所有键上的任务时,您将有许多“正在退出”。
尝试在本地运行并验证其是否工作:python thisfile.py<input.csv>output.txt
也许您可以在steps()中定义一个“reducer\u final”,以获得所有的最后一个step reducer输出,并按您想要的方式进行管理。
支票:http://pythonhosted.org/mrjob/job.html#mrjob.job.mrjob.steps
谨致问候,