pyspark无法在hyperopt sparktrials中评估keras神经网络

t5zmwmid  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(500)

我有一个奇怪的错误,我已经坚持了几天,无法得到解决。
我的目标是评估hyperopt中的几个keras nn。为了增强评估过程,我使用了sparktrails(另请参见http://hyperopt.github.io/hyperopt/scaleout/spark/). 对于所有的sciket学习回归器,这工作得非常好。但是每次我使用keras nn时,模型都会得到评估,但结果不会返回。以下是我收到的错误消息:

20/11/11 11:30:56 ERROR TaskSetManager: Task 0 in stage 9.0 failed 1 times; aborting job
trial task 9 failed, exception is An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 9, path executor driver): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
    at org.apache.spark.api.python.PythonRDD$.collectAndServeWithJobGroup(PythonRDD.scala:183)
    at org.apache.spark.api.python.PythonRDD.collectAndServeWithJobGroup(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:628)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
.
 None
  0%|          | 0/10 [00:41<?, ?trial/s, best loss=?]
Total Trials: 10: 0 succeeded, 10 failed, 0 cancelled.
Traceback (most recent call last):
  File "E:/Git/SystemidentificationTool/play/mini_example.py", line 68, in <module>
    best = fmin(f_nn, space, algo=tpe.suggest, max_evals=10, trials=trials)
  File "E:\venv\lib\site-packages\hyperopt\fmin.py", line 522, in fmin
    trials_save_file=trials_save_file,
  File "E:\venv\lib\site-packages\hyperopt\spark.py", line 274, in fmin
    raise e
  File "E:\venv\lib\site-packages\hyperopt\spark.py", line 270, in fmin
    trials_save_file="",  # not supported
  File "E:\venv\lib\site-packages\hyperopt\fmin.py", line 558, in fmin
    "There are no evaluation tasks, cannot return argmin of task losses."
Exception: There are no evaluation tasks, cannot return argmin of task losses.

我构建了一个最小的代码示例,在该示例中,可以使用以下方法重新创建错误:

from hyperopt import fmin, tpe, hp, STATUS_OK, Trials, SparkTrials
from sklearn.metrics import mean_squared_error
import sys
import numpy as np
from pandas import DataFrame
time = np.arange(0, 100, 0.1)

X = DataFrame(np.sin(time))
y = DataFrame(np.cos(time))
X_val = DataFrame(np.sin(time))
y_val = DataFrame(np.cos(time))

space = {'choice': hp.choice('num_layers',
                [ {'layers':'two', },
                {'layers':'three',
                'units3': hp.uniform('units3', 64,1024),
                'dropout3': hp.uniform('dropout3', .25,.75)}
                ]),
        'units': hp.uniform('units', 64,1024),
        'units1': hp.uniform('units1', 64,1024),
        'units2': hp.uniform('units2', 64,1024),

        'dropout1': hp.uniform('dropout1', .25,.75),
        'dropout2': hp.uniform('dropout2',  .25,.75),

        'batch_size' : 28,

        'nb_epochs' :  2,
        'optimizer': hp.choice('optimizer',['adadelta','adam','rmsprop']),
        'activation': 'relu'
    }

def f_nn(params):
    from keras.models import Sequential
    from keras.layers.core import Dense, Dropout, Activation
    from keras.optimizers import Adadelta, Adam

    print ('Params testing: ', params)
    model = Sequential()
    model.add(Dense(params['units1'], input_dim = X.shape[1]))
    model.add(Activation(params['activation']))
    model.add(Dropout(params['dropout1']))

    model.add(Dense(params['units2'], kernel_initializer="glorot_uniform"))
    model.add(Activation(params['activation']))
    model.add(Dropout(params['dropout2']))

    if params['choice']['layers']== 'three':
        model.add(Dense(params['choice']['units3'], kernel_initializer="glorot_uniform"))
        model.add(Activation(params['activation']))
        model.add(Dropout(params['choice']['dropout3']))

    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer=params['optimizer'])

    model.fit(X, y, epochs=params['nb_epochs'], batch_size=params['batch_size'])

    pred_auc =model.predict(X_val)
    acc = mean_squared_error(y_val, pred_auc)
    print('AUC:', acc)
    sys.stdout.flush()
    return {'loss': -acc, 'status': STATUS_OK}

trials = SparkTrials()
best = fmin(f_nn, space, algo=tpe.suggest, max_evals=10, trials=trials)
print('best: ', best)

我的系统设置如下:python 3.7 hyperopt 0.2.5 keras 2.4.3 tensorflow 2.3.1 pyspark 3.0.1
我使用java版本:openjdk version“1.8.0\u 272”openjdk runtime environment(adoptopenjdk)(build 1.8.0\u 272-b10)openjdk 64位服务器vm(adoptopenjdk)(build 25.272-b10,混合模式)
和spark版本3.0.1
有人能帮我吗???

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题