下面是我们在spark(scala)中实现的工作流。
1-从cassandra读取数据,创建一个具有相应 float
属性值。
2-将此数据集转换为Dataframe并将其缓存在spark中。这是用于每个传入的“匹配分数计算”请求的目标Dataframe。
3-对于每个请求,计算传入属性值和目标Dataframe中每行的目标属性值之间的匹配分数 using a scala UDF
. 创建一个新的Dataframe,其中包含具有匹配分数的唯一ID。
4-从第二个Dataframe中筛选、排序和收集前3个匹配分数。
当我提交一个作业时,对于一个100万行的目标Dataframe,大约需要3秒钟才能完成。
我将上面的步骤(4)替换为一个简单的count()操作,让spark用匹配分数计算第二个Dataframe,然后进行count()。这大约需要0.2秒。
此外,当我检查这个简化的作业时,我发现创建第二个Dataframe只花费了75毫秒,而count()只花费了大约85毫秒—如果这有意义的话。
因此,我得出的结论是,我需要找到一种方法来摆脱需要洗牌的部分(4)。
我在考虑下面的机制,想知道这在技术上是否可行:
1-同上
2-也一样
3-同样,但增加了如下逻辑/机制:
创建一个累加器来容纳 list
对(匹配分数,唯一id)
创建一个局部scala变量。在计算目标Dataframe的每一行的匹配分数并计算第二个Dataframe时,使用计算出的最高分数更新此局部变量 on that executor
. 因此,每个执行者都必须在这个变量中存储自己的最高计算分数(以及唯一id)。
将存储在此局部执行器变量中的值附加到累加器。所以在每项工作结束时,我们有一个 (score,ID)
驱动程序中的元组。
返回列表中的前3个元素(累加器)
4-不运行
我在想,如果这个机制可以实现,我们可以获得巨大的性能提高,从3秒到0.1秒左右。
你认为这在技术上可行吗?我是否可以创建一个局部scala变量,该变量将在作业执行期间在每个执行器中分别更新,并在下一个匹配计算作业开始时重置?
蓄电池怎么样?是否会在每个作业结束时自动重置?或者我应该以编程方式重置这些局部变量和累加器?
谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!