我有一组物联网数据,在python作业中由azure Databricks转换。Databricks集群是13.3 LTS(包括Apache Spark 3.4.1,Scala 2.12)Standard_DS3_v2
我把这些信息放进一个增量表里。
我从delta表中检索数据,并在for循环中的2小时帧内开始将数据与其本身进行对抗,通过这样做,我正在进行大量的对抗(n*n)。
问题是udf可以工作,但速度很慢,1000条消息需要15分钟,而我需要在150万条消息的流量下停留不到10分钟。
我对udf做了以下操作:
for row in rows:
try:
mse = udf(lambda x : sum( (a - b)*(a - b) for a, b in zip(x[:-1] , row["enc"][:-1]))/450 )
df_compare = df_compare.withColumn('diff_enc',sqrt(mse(df_entrance_compare.enc)))
字符串
其中rows是一个具有相同模式的df_compare和row[“enc”]的数组,并且df_compare.enc包含每个单元格的451个元素的列表:
| 列编码|
| --|
| [1.0、2.0、3.0、4.0、.]|
| [1.0、2.0、3.0、4.0、.]|
有没有一个更聪明更快的方法来使这个计算更快使用Spark?
停止使用Databricks而使用noslql数据库并在函数中进行计算可能是一个更好的主意吗?
1条答案
按热度按时间ltqd579y1#
如果你可以升级到Spark 3.5.0,你将可以使用reduce,它允许你用纯Spark函数来表达均方误差计算,没有UDF:
字符串