pyspark套接字超时错误将self.\u sock.recv\u返回(b)socket.timeout:超时

9jyewag0  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(1042)

我已经为协同过滤推荐系统编写了一个spark程序(Python3.6和spark 2.3.2),它适用于两种情况:
案例1:基于项目的cf推荐系统
案例2:基于用户的最小哈希lsh cf推荐系统
我已经写了训练和预测程序,有这两种情况。我的代码适用于基于用户的推荐,但当我尝试为基于项目的cf训练模型时,出现以下错误:

2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
  File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
    length = stream.read(4)
  File "C:\Users\17372\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

我尝试使用以下链接上的解决方案解决此问题:应用程序运行一段时间后pyspark套接字超时异常
它不起作用。
我找到了一个在执行中添加“--spark.worker.timeout=120”的解决方案,如下所示:

bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120

我仍然看到同样的错误。我也试过抓积木,但我不知道怎么做对。
我该怎么办?
基于项目的cf的我的代码:

if model_type == ITEM_BASED_MODEL:
        # group original data by bidx, and remove those unpopular business (rated time < 3)
        # tuple(bidx, (uidx, score))
        # [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
        shrunk_bid_uids_rdd = input_lines \
            .map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) \
            .groupByKey().mapValues(lambda uid_score: list(uid_score)) \
            .filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
            .mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
            .mapValues(lambda val: flatMixedList(val))

        candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)

        # convert shrunk_bid_uids_rdd into dict form
        # dict(bidx: dict(uidx: score))
        # => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
        bid_uid_dict = shrunk_bid_uids_rdd \
            .map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
            .flatMap(lambda kv_items: kv_items.items()).collectAsMap()

        # generate all possible pair between candidate bidx
        # and compute the pearson similarity
        candidate_pair = candidate_bids.cartesian(candidate_bids) \
            .filter(lambda id_pair: id_pair[0] < id_pair[1]) \
            .filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
                                                  bid_uid_dict[id_pair[1]])) \
            .map(lambda id_pair: (id_pair,
                                  computeSimilarity(bid_uid_dict[id_pair[0]],
                                                    bid_uid_dict[id_pair[1]]))) \
            .filter(lambda kv: kv[1] > 0) \
            .map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
                             "b2": reversed_index_bus_dict[kv[0][1]],
                             "sim": kv[1]})
fsi0uk1n

fsi0uk1n1#

我在本地运行python3.7和spark2.4.4时遇到了同样的错误。Spark选项的组合没有帮助。
我在看Parquet地板文件中的一排排,这些文件严重倾斜。它们包含一个二进制列,值在几个字节到10mb之间。结果Dataframe包含的分区数量相对较少,尽管为 spark.default.parallelism . 分区的数量仍然与我正在读取的Parquet文件的数量相似,并且我不断得到一个套接字超时。
我试着 spark.sql.files.maxPartitionBytes 一个足够小的值,但错误仍然存在。唯一有用的是 repartition 读取数据后,可以增加分区的数量,并更均匀地分布行。请注意,这只是一个观察,我仍然无法解释为什么错误消失了。
如果数据倾斜也是这里的一个主题,那么可以通过将代码更改为:

input_lines \
    .repartition(n) \
    .map(...)
``` `n` 取决于你的集群和工作特点,有一个最佳点。如果 `n` 太低,您将获得套接字超时。如果 `n` 太大会对性能产生负面影响。

相关问题