我使用的是apachespark,我想合并两个Dataframe,一个包含现有数据,另一个包含(潜在的)更新。合并应该发生在给定数量的键属性上,但是,对于一组键属性,将有多个现有行需要被多个新行替换(如果“更新数据”的时间戳比现有数据的时间戳更新)。
作为可视化,假设现有数据
+------+------+---------+-----------+------------+
| key1 | key2 | subkey3 | timestamp | attributes |
+------+------+---------+-----------+------------+
| 1 | 1 | 0 | 0 | something1 |
| 1 | 1 | 1 | 0 | something2 |
| 1 | 2 | 0 | 0 | something3 |
| 1 | 2 | 1 | 0 | something4 |
| 1 | 3 | 0 | 0 | something5 |
+------+------+---------+-----------+------------+
更新数据是
+------+------+---------+-----------+----------------------+
| key1 | key2 | subkey3 | timestamp | attributes |
+------+------+---------+-----------+----------------------+
| 1 | 1 | 0 | 1 | something_new1 |
| 1 | 1 | 1 | 1 | something_new2 |
| 1 | 1 | 2 | 1 | something_new3 |
| 1 | 2 | 0 | 1 | something_new4 |
| 1 | 2 | 0 | 2 | something_even_newer |
| 1 | 4 | 0 | 1 | something6 |
+------+------+---------+-----------+----------------------+
然后生成的Dataframe应如下所示:
+------+------+---------+-----------+----------------------+
| key1 | key2 | subkey3 | timestamp | attributes |
+------+------+---------+-----------+----------------------+
| 1 | 1 | 0 | 1 | something_new1 |
| 1 | 1 | 1 | 1 | something_new2 |
| 1 | 1 | 2 | 1 | something_new3 |
| 1 | 2 | 0 | 2 | something_even_newer |
| 1 | 3 | 0 | 0 | something5 |
| 1 | 4 | 0 | 1 | something6 |
+------+------+---------+-----------+----------------------+
所以在这个例子中,合并发生在两个键上 key1
以及 key2
如果在更新数据中该复合键有较新的行,则相同键的所有现有行将被更新数据中最新的行替换。请注意,在应用更新后,给定复合键的行数可能会在任一方向上改变。
解决这个问题的一个方法是这样的窗口排名:
df_merged = ( df_old
.union(df_update)
.withColumn("rank",
rank().over(
Window.partitionBy(
col("key1"),
col("key2"),
)
.orderBy(col("timestamp").desc())
)
)
.filter(col("rank") == 1)
.drop("rank")
)
假设所有数据都存储在parquet或delta表中,spark中获得所需行为的最有效方法是什么?
1条答案
按热度按时间xsuvu9jc1#
我试着用下面的方法来解决这个问题-
代码
读取两个Dataframe
Dataframe-1
```val implicits = spark.implicits
import implicits._
val schema = StructType(
Array(StructField("key1", DataTypes.IntegerType),
StructField("key2", DataTypes.IntegerType),
StructField("subkey3", DataTypes.IntegerType),
StructField("timestamp", DataTypes.IntegerType),
StructField("attributes", DataTypes.StringType))
)
val data1 =
"""
| 1 | 1 | 0 | 0 | something1
| 1 | 1 | 1 | 0 | something2
| 1 | 2 | 0 | 0 | something3
| 1 | 2 | 1 | 0 | something4
| 1 | 3 | 0 | 0 | something5
""".stripMargin
+----+----+-------+---------+----------+
|key1|key2|subkey3|timestamp|attributes|
+----+----+-------+---------+----------+
|1 |1 |0 |0 |something1|
|1 |1 |1 |0 |something2|
|1 |2 |0 |0 |something3|
|1 |2 |1 |0 |something4|
|1 |3 |0 |0 |something5|
+----+----+-------+---------+----------+
root
|-- key1: integer (nullable = true)
|-- key2: integer (nullable = true)
|-- subkey3: integer (nullable = true)
|-- timestamp: integer (nullable = true)
|-- attributes: string (nullable = true)
`dataframe-2`
val data2 =
"""
| 1 | 1 | 0 | 1 | something_new1
| 1 | 1 | 1 | 1 | something_new2
| 1 | 1 | 2 | 1 | something_new3
| 1 | 2 | 0 | 1 | something_new4
| 1 | 2 | 0 | 2 | something_even_newer
| 1 | 4 | 0 | 1 | something6
""".stripMargin
+----+----+-------+---------+--------------------+
|key1|key2|subkey3|timestamp|attributes |
+----+----+-------+---------+--------------------+
|1 |1 |0 |1 |something_new1 |
|1 |1 |1 |1 |something_new2 |
|1 |1 |2 |1 |something_new3 |
|1 |2 |0 |1 |something_new4 |
|1 |2 |0 |2 |something_even_newer|
|1 |4 |0 |1 |something6 |
+----+----+-------+---------+--------------------+
val processedDf1 = df1.unionByName(df2)
.withColumn("timestamp_attributes", struct("timestamp", "attributes", "subkey3"))
.groupBy("key1", "key2")
.agg(max("timestamp_attributes").as("attributes"))
+----+----+----------------------------+
|key1|key2|attributes |
+----+----+----------------------------+
|1 |1 |[1, something_new3, 2] |
|1 |2 |[2, something_even_newer, 0]|
|1 |3 |[0, something5, 0] |
|1 |4 |[1, something6, 0] |
+----+----+----------------------------+
+----+----+-------+---------+--------------------+
|key1|key2|subkey3|timestamp|attributes |
+----+----+-------+---------+--------------------+
|1 |1 |2 |1 |something_new3 |
|1 |2 |0 |2 |something_even_newer|
|1 |3 |0 |0 |something5 |
|1 |4 |0 |1 |something6 |
+----+----+-------+---------+--------------------+
val processedDf = df1.unionByName(df2)
.withColumn("timestamp_attributes", struct("timestamp", "attributes", "subkey3"))
.groupBy("key1", "key2")
.agg(collect_list("timestamp_attributes").as("attributes"))
.withColumn("attributes", element_at(array_sort(col("attributes")), size(col("attributes"))))
+----+----+-------+---------+--------------------+
|key1|key2|subkey3|timestamp|attributes |
+----+----+-------+---------+--------------------+
|1 |1 |2 |1 |something_new3 |
|1 |2 |0 |2 |something_even_newer|
|1 |3 |0 |0 |something5 |
|1 |4 |0 |1 |something6 |
+----+----+-------+---------+--------------------+