问题:
我一直致力于使用pyspark和spark-ml库分发交叉验证过程,因此它比使用常规顺序计算(即scikit)所需的时间更少。但是,我在做这件事时遇到了一些问题。具体地说,当我开始工作时,我不断得到消息“广播大小为x的大任务二进制文件”(x是从1700kib到6mib的数字)。在我离开作业一段时间后,它最终以消息“job x cancelled because sparkcontext was shutdown”(对于许多xs=jobs)和“error transportrequesthandler:error while invoking rpchandler#receive()for one-way message”结束。org.apache.spark.sparkexception:找不到“GrassedScheduler”。
推理:
因为我必须修改“pyspark.ml.tuning#crossvalidator”中crossvalidator#fit方法的源代码,所以我对它的工作方式非常熟悉,知道它分配任务的方式是通过对数据集的每个分割进行并行化,并使用不同的参数设置训练模型。也就是说,crossvalidator\u fit每次都将整个数据集发送给执行者,在每个执行者中单独训练一个具有特定参数组合的模型,而且spark似乎不喜欢过多地广播数据集。以下是pyspark.ml.tunning拟合方法的相关部分:
for i in range(nFolds):
validation = datasets[i][1].cache()
train = datasets[i][0].cache()
tasks = _parallelFitTasks(est, train, eva, validation, epm, collectSubModelsParam)
for j, metric, subModel in pool.imap_unordered(lambda f: f(), tasks):
metrics[j] += (metric / nFolds)
if collectSubModelsParam:
subModels[i][j] = subModel
validation.unpersist()
train.unpersist()
我试过的:
我已经尝试了最常见的广播警告解决方案,我得到即使我已经想象他们不会在我的情况下工作。具体来说,我修改了数据分区和parallelization参数,以及执行器和驱动程序的内存大小。
我很肯定,如果在ml库中存在crossvalidator的分布式实现,那是因为它实际上很有用。但是,我肯定遗漏了一些东西,因为如果我的数据集很大并且需要广播很多次(因为实现的原因),我无法思考如何让它工作。也许我错过了什么?
暂无答案!
目前还没有任何答案,快来回答吧!