问题
我的数据结构如下:
train_info:(over 30000 rows)
----------
odt:string (unique)
holiday_type:string
od_label:string
array:array<double> (with variable length depend on different odt and holiday_type )
useful_index:array<int> (length same as vectors)
...(other not important cols)
label_data:(over 40000 rows)
----------
holiday_type:string
od_label: string
l_origin_array:array<double> (with variable length)
...(other not important cols)
my expected result is like this(length same with train_info):
--------------
odt:string
holiday_label:string
od_label:string
prediction:int
我的解决方案如下:
if __name__=='__main __'
loop_item = train_info.collect()
result = knn_for_loop(spark, loop_item,train_info.schema,label_data)
----- do something -------
def knn_for_loop(spark, predict_list, schema, label_data):
result = list()
for i in predict_list:
# turn this Row col to Data Frame and join on label data
# across to this row data pick label data array data
predict_df = spark.sparkContext.parallelize([i]).toDF(schema) \
.join(label_data, on=['holiday_type', "od_label"], how='left') \
.withColumn("l_array",
UDFuncs.value_from_array_by_index(f.col('l_origin_array'), f.col("useful_index"))) \
.toPandas()
# pandas execute
train_x = predict_df.l_array.values
train_y = predict_df.label.values
test_x = predict_df.array.values[0]
test_y = KNN(train_x, train_y, test_x)
result.append((i['odt'], i['holiday_type'], i['od_label'], test_y))
return result
它的工作,但真的很慢,我估计每行需要18秒。
在r语言中,我可以很容易地使用do函数:
列车信息%>%分组依据(odt)%>%do(,knn循环,标签数据)
我尝试的东西
我尝试在使用前连接它们,在计算时查询它们,但数据太大,无法运行(连接后这两个df有4亿行,占用了配置单元上180gb的磁盘空间,查询速度非常慢)。我试着使用它,但它只允许一个pd.data.frame参数(慢)。
我尝试使用udf,但udf无法接收Dataframe对象。
我试图使用spark knn包,但我运行时出错,可能是我的脱机安装错误。
谢谢你的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!