scala—在大量分区上处理upserts不够快

n3schb8v  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(346)

问题
我们在adls gen2上有一个delta lake设置,如下表所示: bronze.DeviceData :按到达日期划分( Partition_Date ) silver.DeviceData :按事件日期和时间划分( Partition_Date 以及 Partition_Hour )
我们从事件中心将大量数据(每天超过6亿条记录)摄取到 bronze.DeviceData (仅附加)。然后,我们以流式方式处理新文件,并将它们插入到 silver.DeviceData 使用delta merge命令(见下文)。
到达bronze表的数据可以包含来自任何分区的银色数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的数据中都有超过90%来自分区 Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS) . 因此,要插入数据,我们有以下两个spark任务:
“fast”:处理上述三个日期分区中的数据。延迟在这里很重要,所以我们对这些数据进行优先级排序
“slow”:处理其余部分(除了这三个日期分区之外的任何部分)。延迟并不重要,但它应该在一个“合理”的时间内(我认为不超过一周)
现在我们来讨论这个问题:虽然“慢”作业中的数据量要小得多,但它运行了几天,只处理一天的慢青铜数据,并且有一个大集群。原因很简单:它必须读取和更新许多银色分区(每次超过1000个日期分区),而且由于更新很小,但日期分区可能是千兆字节,因此这些合并命令效率很低。
此外,随着时间的推移,这个缓慢的工作将变得越来越慢,因为它接触到的银色分区将增长。
问题
我们的分区方案和快/慢spark作业设置通常是解决这个问题的好方法吗?
可以做些什么来改进这个设置?我们希望降低慢工的成本和延迟,并找到一种方法,使慢工随任何一天以铜板形式到达的数据量而增长,而不是随银表的大小而增长
附加信息
我们需要merge命令,因为某些上游服务可以重新处理历史数据,然后也应该更新silver表
银表的模式:

CREATE TABLE silver.DeviceData (
  DeviceID LONG NOT NULL, -- the ID of the device that sent the data
  DataType STRING NOT NULL, -- the type of data it sent
  Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
  Value DOUBLE NOT NULL, -- the value that the device sent
  UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
  Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
  Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'

我们的合并命令:

val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)

val batch = ... // the streaming update batch

// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."

val mergeCondition = s"""
  silver.Partition_Date IN ($dates)
  AND silver.Partition_Hour IN ($hours)
  AND silver.Partition_Date = batch.Partition_Date
  AND silver.Partition_Hour = batch.Partition_Hour
  AND silver.DeviceID = batch.DeviceID
  AND silver.Timestamp = batch.Timestamp
  AND silver.DataType = batch.DataType
"""

silverTable.alias("silver")
  .merge(batch.alias("batch"), mergeCondition)
  // only merge if the event is newer
  .whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
  .whenNotMatched.insertAll
  .execute
ghg1uchk

ghg1uchk1#

在databricks上,有几种方法可以优化 merge into 操作:
对属于联接条件的列使用zorder执行优化。这可能取决于特定的dbr版本,因为旧版本(7.6 iirc之前)使用的是real zorder算法,该算法适用于较少的列数,而dbr 7.6+默认使用hilbert空间填充曲线
使用较小的文件大小-默认情况下, OPTIMIZE 创建需要重写的1gb文件。你可以用 spark.databricks.delta.optimize.maxFileSize 将文件大小设置为32mb-64mb,以便重写更少的数据
表分区上的使用条件(您已经这样做了)
不要使用自动压缩,因为它不能执行zorder,而是使用zorder运行显式优化。有关详细信息,请参阅文档
调整列的索引,这样它将只索引条件和查询所需的列。它与合并部分相关,但可以稍微提高写入速度,因为不会收集不用于查询的列的统计信息。
spark summit的这个演示讨论了 merge into -观察哪些指标等。
我不能百分之百肯定你需要条件 silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours) 因为如果传入数据中没有特定的分区,则可能会读取比所需更多的数据,但这需要查看执行计划。这篇知识库文章解释了如何确保 merge into 使用分区修剪。

相关问题