问题
我们在adls gen2上有一个delta lake设置,如下表所示: bronze.DeviceData
:按到达日期划分( Partition_Date
) silver.DeviceData
:按事件日期和时间划分( Partition_Date
以及 Partition_Hour
)
我们从事件中心将大量数据(每天超过6亿条记录)摄取到 bronze.DeviceData
(仅附加)。然后,我们以流式方式处理新文件,并将它们插入到 silver.DeviceData
使用delta merge命令(见下文)。
到达bronze表的数据可以包含来自任何分区的银色数据(例如,设备可以发送它在本地缓存的历史数据)。但是,任何一天到达的数据中都有超过90%来自分区 Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
. 因此,要插入数据,我们有以下两个spark任务:
“fast”:处理上述三个日期分区中的数据。延迟在这里很重要,所以我们对这些数据进行优先级排序
“slow”:处理其余部分(除了这三个日期分区之外的任何部分)。延迟并不重要,但它应该在一个“合理”的时间内(我认为不超过一周)
现在我们来讨论这个问题:虽然“慢”作业中的数据量要小得多,但它运行了几天,只处理一天的慢青铜数据,并且有一个大集群。原因很简单:它必须读取和更新许多银色分区(每次超过1000个日期分区),而且由于更新很小,但日期分区可能是千兆字节,因此这些合并命令效率很低。
此外,随着时间的推移,这个缓慢的工作将变得越来越慢,因为它接触到的银色分区将增长。
问题
我们的分区方案和快/慢spark作业设置通常是解决这个问题的好方法吗?
可以做些什么来改进这个设置?我们希望降低慢工的成本和延迟,并找到一种方法,使慢工随任何一天以铜板形式到达的数据量而增长,而不是随银表的大小而增长
附加信息
我们需要merge命令,因为某些上游服务可以重新处理历史数据,然后也应该更新silver表
银表的模式:
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
我们的合并命令:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute
1条答案
按热度按时间ghg1uchk1#
在databricks上,有几种方法可以优化
merge into
操作:对属于联接条件的列使用zorder执行优化。这可能取决于特定的dbr版本,因为旧版本(7.6 iirc之前)使用的是real zorder算法,该算法适用于较少的列数,而dbr 7.6+默认使用hilbert空间填充曲线
使用较小的文件大小-默认情况下,
OPTIMIZE
创建需要重写的1gb文件。你可以用spark.databricks.delta.optimize.maxFileSize
将文件大小设置为32mb-64mb,以便重写更少的数据表分区上的使用条件(您已经这样做了)
不要使用自动压缩,因为它不能执行zorder,而是使用zorder运行显式优化。有关详细信息,请参阅文档
调整列的索引,这样它将只索引条件和查询所需的列。它与合并部分相关,但可以稍微提高写入速度,因为不会收集不用于查询的列的统计信息。
spark summit的这个演示讨论了
merge into
-观察哪些指标等。我不能百分之百肯定你需要条件
silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours)
因为如果传入数据中没有特定的分区,则可能会读取比所需更多的数据,但这需要查看执行计划。这篇知识库文章解释了如何确保merge into
使用分区修剪。