应用程序运行一段时间后pyspark套接字超时异常

ybzsozfc  于 2021-07-09  发布在  Spark
关注(0)|答案(4)|浏览(515)

我用pyspark来估计logistic回归模型的参数。我使用spark计算似然和梯度,然后使用scipy的最小化函数进行优化(l-bfgs-b)。
我使用客户机模式运行我的应用程序。我的应用程序可以毫无问题地开始运行。但是,一段时间后,它会报告以下错误:

Traceback (most recent call last):
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
    options={'disp': False})
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
    callback=callback,**options)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
    f, g = func_and_grad(x)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
    f = fun(x, *args)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
    return function(*(wrapper_args + args))
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
    yield self._read_with_length(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
    data = self._sock.recv(left)
socket.timeout: timed out

我还发现 python broken pipe 将spark log level设置为“all”时出错。
我使用的是spark 1.6.2和Java1.8.0\u91。知道怎么回事吗?

--更新--

我发现这与我在程序中使用的优化例程有关。
我所做的是使用em算法(作为迭代算法)用最大似然法估计统计模型。在每次迭代中,我需要通过解决最小化问题来更新参数。spark负责计算我的似然和梯度,然后传递给scipy的最小化例程,在那里我使用l-bfgs-b方法。看来这套程序中有什么东西毁了我的工作。但我不知道是哪一部分造成了这个问题。
另一个观察结果是,在使用相同的示例和相同的程序时,我更改了分区的数量。当分区数很小时,我的程序可以毫无问题地完成。但是,当分区数量变大时,程序开始崩溃。

laximzn5

laximzn51#

我们在ibm的spss modeler中使用pyspark扩展节点时遇到了相同的问题。所有上述解决方案(以及在互联网上可以找到的其他解决方案)都不起作用。在某个时刻,我们发现当我和同事在同一台机器上同时执行pyspark扩展节点时,总是会发生这种情况。这似乎让python的工作人员陷入了混乱或被杀的境地。唯一的解决办法是不要同时执行pyspark的东西。。。

monwx1rj

monwx1rj2#

我有一个类似的问题,对我来说,这解决了它:

import pyspark as ps

conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")
sc = ps.SparkContext('local[4]', '', conf=conf) # uses 4 cores on your local machine

在此处设置其他选项的更多示例:https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad

gkl3eglg

gkl3eglg3#

我也有类似的问题。我有一个迭代,有时执行时间太长,超时。增加的 spark.executor.heartbeatInterval 似乎解决了问题。我将它增加到3600,以确保我不会再次遇到超时,从那时起一切都正常工作。
发件人:http://spark.apache.org/docs/latest/configuration.html :
spark.executor.heartbeatinterval每个执行者对驱动程序的心跳间隔10s。heartbeats让驱动程序知道executor仍然活着,并用进行中任务的度量来更新它。

lx0bsm1f

lx0bsm1f4#

查看执行者日志以了解详细信息。当执行器死亡或被集群管理器杀死时(通常是因为使用了比容器配置更多的内存),我也看到过类似的错误。

相关问题