我将一些pandasDataframe加载到内存中,用一个目标函数结束它们,然后将该目标函数传递给hyperopt.fmin(),并指示hyperopt使用sparktrials对象(我还用mlflow跟踪这个)。Dataframe相对较大(约11gb)。这一切都发生在运行7.1ml databricks运行时的databricks集群上。
我的问题是:引擎盖下面发生了什么?当python函数在驱动程序上定义,然后在工作程序上执行时会发生什么?
当我这样做的时候,内存使用爆炸,需要很长时间才能完成(见下面的图表):
X_train, y_train, X_test, y_test = load_data() # Load data frames
mlflow.autolog()
search_space = [
hp.uniform('alpha', 0.0000001, 0.1),
]
max_evals = 100
parallelism = 2
spark_trials = hyperopt.SparkTrials(parallelism=parallelism)
with mlflow.start_run():
def objective(params):
return {
'loss': X_train.shape[0] * X_test.shape[0] * y_test.shape[0] * y_train.shape[0] * params[0],
'status': STATUS_OK
}
fmin(
fn=objective,
space=search_space,
algo=hyperopt.tpe.suggest,
max_evals=max_evals,
trials=spark_trials)
ganglia为两小时的运行绘制图表(x.x.x.4是驱动程序节点)。工作没有完成(我杀了它)
注意网络图,我推测Dataframe在每次需要时都会被传输。
不过,通过显式地将帧广播到工作节点,我在某种程度上解决了这个问题:
sc = SparkContext.getOrCreate()
bc_X_train, bc_y_train, bc_X_test, bc_y_test = list(map(sc.broadcast, load_data()))
mlflow.autolog()
search_space = [
hp.uniform('alpha', 0.0000001, 0.1),
]
max_evals = 100
parallelism = 2
spark_trials = hyperopt.SparkTrials(parallelism=parallelism)
with mlflow.start_run():
def objective(params):
return {
'loss': bc_X_train.value.shape[0] * bc_X_test.value.shape[0] * bc_y_test.value.shape[0] * bc_y_train.value.shape[0] * params[0],
'status': STATUS_OK
}
fmin(
fn=objective,
space=search_space,
algo=hyperopt.tpe.suggest,
max_evals=max_evals,
trials=spark_trials)
这段代码的ganglia图表显示了一次用于广播Dataframe的网络。完成100次评估大约需要12分钟。
暂无答案!
目前还没有任何答案,快来回答吧!