我已经为协同过滤推荐系统编写了一个spark程序(Python3.6和spark 2.3.2),它适用于两种情况:
案例1:基于项目的cf推荐系统
案例2:基于用户的最小哈希lsh cf推荐系统
我已经写了训练和预测程序,有这两种情况。我的代码适用于基于用户的推荐,但当我尝试为基于项目的cf训练模型时,出现以下错误:
2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
length = stream.read(4)
File "C:\Users\17372\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
我尝试使用以下链接上的解决方案解决此问题:应用程序运行一段时间后pyspark套接字超时异常
它不起作用。
我找到了一个在执行中添加“--spark.worker.timeout=120”的解决方案,如下所示:
bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120
我仍然看到同样的错误。我也试过抓积木,但我不知道怎么做对。
我该怎么办?
基于项目的cf的我的代码:
if model_type == ITEM_BASED_MODEL:
# group original data by bidx, and remove those unpopular business (rated time < 3)
# tuple(bidx, (uidx, score))
# [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
shrunk_bid_uids_rdd = input_lines \
.map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) \
.groupByKey().mapValues(lambda uid_score: list(uid_score)) \
.filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
.mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
.mapValues(lambda val: flatMixedList(val))
candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)
# convert shrunk_bid_uids_rdd into dict form
# dict(bidx: dict(uidx: score))
# => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
bid_uid_dict = shrunk_bid_uids_rdd \
.map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
.flatMap(lambda kv_items: kv_items.items()).collectAsMap()
# generate all possible pair between candidate bidx
# and compute the pearson similarity
candidate_pair = candidate_bids.cartesian(candidate_bids) \
.filter(lambda id_pair: id_pair[0] < id_pair[1]) \
.filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]])) \
.map(lambda id_pair: (id_pair,
computeSimilarity(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]]))) \
.filter(lambda kv: kv[1] > 0) \
.map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
"b2": reversed_index_bus_dict[kv[0][1]],
"sim": kv[1]})
1条答案
按热度按时间fsi0uk1n1#
我在本地运行python3.7和spark2.4.4时遇到了同样的错误。Spark选项的组合没有帮助。
我在看Parquet地板文件中的一排排,这些文件严重倾斜。它们包含一个二进制列,值在几个字节到10mb之间。结果Dataframe包含的分区数量相对较少,尽管为
spark.default.parallelism
. 分区的数量仍然与我正在读取的Parquet文件的数量相似,并且我不断得到一个套接字超时。我试着
spark.sql.files.maxPartitionBytes
一个足够小的值,但错误仍然存在。唯一有用的是repartition
读取数据后,可以增加分区的数量,并更均匀地分布行。请注意,这只是一个观察,我仍然无法解释为什么错误消失了。如果数据倾斜也是这里的一个主题,那么可以通过将代码更改为: