我有一个数据管道,我必须在 distinct final sink上执行许多小的增量操作。想想看,取一个巨大的输入框架,将其拆分,并将内容写入许多小的单独表/桶。
我希望在单个Spark进程中串行地执行此操作,因为初始处理的开销很大。在本例中,可以识别一系列要循环的target
操作,并循环它们。
我想出的每一个组合都有 * 驱动程序 * 为串行操作做所有的工作。它最终会耗尽内存并死亡。虽然如果我将每个命令独立复制/粘贴到程序中,它可以在集群中正常运行。
val df = (some complicated query, returning lots of data)
val someOtherDf = (something time consuming, huge df)
// Some large set of small tasks - can come from anywhere
val targets = someOtherDf.select("a_column").distinct.collect.toSeq
targets.foreach(target => process(target)) // this appears to only happen within the driver
// This is fairly "cheap", and must be repeated many times.
def process(df1, df2, someValue) = {
df1.where("column" === lit(someValue)).join(df2, df1.col("joinColumn") === df2.col("joinColumn")).writeTo("someTable").createOrReplace
}
字符串
有没有一种方法可以在一个Spark上下文中串行地执行一个JavaScript框架上的一些进程?
2条答案
按热度按时间1sbrub3j1#
它有助于知道什么在驱动程序上执行,什么在执行器上执行来回答这个问题。This answer讨论了哪些操作发生在驱动程序上,哪些发生在执行器上。它的简短版本是:
join
中一定有语法错误,丢失了第二个表,但对这个答案来说没有关系):字符串
现在,看起来你正在尝试以一种并不真正适合的方式使用Spark。如果你的印象是所有的工作都发生在驱动程序上,这意味着在你的
foreach
循环中(你在上面循环了10万次)在所有开销相关的事情上有更多的工作。(确保一个表存在,创建它,循环你的列表,.)而不是实际过滤你的表,并写出与当前循环迭代相关的记录。我有什么建议来解决此问题?
从最推荐到最不推荐:
1.正如我在评论中所说,我会避免这样做。你会给自己带来很多这样的技术债务。你会在写作和写作方面都有糟糕的表现。(就像你现在的感觉一样)和阅读,这将是一个混乱。Spark是一个大数据工具,可以轻松处理巨大的表,所以处理巨大的表不应该是一个问题。如果你需要这个访问控制,有其他方法可以解决这个问题。但我理解你想探索这个想法,所以在这里添加更多的选项。
1.如果你真的想让你的表有某种“分离”,你可以在写磁盘的时候尝试分区。所以你不会有
foreach
循环,你会有一个大的join
和saveAsTable
,如下所示:型
1.继续这一点,这将给你给予一个磁盘布局,将文件与
column
中所有不同的值分开。我不一定推荐这样做,因为column
列的基数很大(100k),所以这将给予您的性能。你也可以以不同的方式读入文件,甚至可以给不同的分区给予不同的权限。1.继续你正在做的事情,但是在运行你的程序时通过增加
spark.driver.memory
来给你的驱动给予更多的资源。你的执行器资源可能会非常微不足道。这样做会让你远离分布式计算,朝着串行计算的方向发展,所以使用像polars这样的东西来代替Spark可能会给你给予更好的结果。gcuhipw92#
一种方法来实现它,而不会过度使用你的驱动程序可以是替换
collect.toSeq
,它试图将整个数据集拉入驱动程序的内存与toLocalIterator()
,它会做一个分区,在一个时间。只是要注意的警告/注意,如文档中所述:def toLocalIterator():Iterator[T]
返回一个迭代器,它包含此数据集中的所有行。
迭代器将消耗与此Dataset中最大分区相同的内存。
自2.0.0起
注意这会导致多个Spark作业,如果输入数据集是广泛转换的结果(例如,与不同分区的连接),为了避免重新计算,应该首先缓存输入数据集。
如果需要,您可以在缓存之前通过重新分区
someOtherDf.select("a_column").distinct
来进一步控制内存需求。