对包含多行的键进行spark merge(replace)

qeeaahzv  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(364)

我使用的是apachespark,我想合并两个Dataframe,一个包含现有数据,另一个包含(潜在的)更新。合并应该发生在给定数量的键属性上,但是,对于一组键属性,将有多个现有行需要被多个新行替换(如果“更新数据”的时间戳比现有数据的时间戳更新)。
作为可视化,假设现有数据

+------+------+---------+-----------+------------+
| key1 | key2 | subkey3 | timestamp | attributes |
+------+------+---------+-----------+------------+
|    1 |    1 |       0 |         0 | something1 |
|    1 |    1 |       1 |         0 | something2 |
|    1 |    2 |       0 |         0 | something3 |
|    1 |    2 |       1 |         0 | something4 |
|    1 |    3 |       0 |         0 | something5 |
+------+------+---------+-----------+------------+

更新数据是

+------+------+---------+-----------+----------------------+
| key1 | key2 | subkey3 | timestamp |      attributes      |
+------+------+---------+-----------+----------------------+
|    1 |    1 |       0 |         1 | something_new1       |
|    1 |    1 |       1 |         1 | something_new2       |
|    1 |    1 |       2 |         1 | something_new3       |
|    1 |    2 |       0 |         1 | something_new4       |
|    1 |    2 |       0 |         2 | something_even_newer |
|    1 |    4 |       0 |         1 | something6           |
+------+------+---------+-----------+----------------------+

然后生成的Dataframe应如下所示:

+------+------+---------+-----------+----------------------+
| key1 | key2 | subkey3 | timestamp |      attributes      |
+------+------+---------+-----------+----------------------+
|    1 |    1 |       0 |         1 | something_new1       |
|    1 |    1 |       1 |         1 | something_new2       |
|    1 |    1 |       2 |         1 | something_new3       |
|    1 |    2 |       0 |         2 | something_even_newer |
|    1 |    3 |       0 |         0 | something5           |
|    1 |    4 |       0 |         1 | something6           |
+------+------+---------+-----------+----------------------+

所以在这个例子中,合并发生在两个键上 key1 以及 key2 如果在更新数据中该复合键有较新的行,则相同键的所有现有行将被更新数据中最新的行替换。请注意,在应用更新后,给定复合键的行数可能会在任一方向上改变。
解决这个问题的一个方法是这样的窗口排名:

df_merged = ( df_old
  .union(df_update)
  .withColumn("rank",
    rank().over(
      Window.partitionBy(
        col("key1"), 
        col("key2"), 
      )
      .orderBy(col("timestamp").desc())
     )
   )
   .filter(col("rank") == 1)
   .drop("rank")
)

假设所有数据都存储在parquet或delta表中,spark中获得所需行为的最有效方法是什么?

xsuvu9jc

xsuvu9jc1#

我试着用下面的方法来解决这个问题-

代码

读取两个Dataframe Dataframe-1 ```

val implicits = spark.implicits
import implicits._
val schema = StructType(
Array(StructField("key1", DataTypes.IntegerType),
StructField("key2", DataTypes.IntegerType),
StructField("subkey3", DataTypes.IntegerType),
StructField("timestamp", DataTypes.IntegerType),
StructField("attributes", DataTypes.StringType))
)
val data1 =
"""
| 1 | 1 | 0 | 0 | something1
| 1 | 1 | 1 | 0 | something2
| 1 | 2 | 0 | 0 | something3
| 1 | 2 | 1 | 0 | something4
| 1 | 3 | 0 | 0 | something5
""".stripMargin

val df1 = spark.read
  .schema(schema)
  .option("sep", "|")
  .csv(data1.split(System.lineSeparator()).map(_.replaceAll("\\s*", "")).toSeq.toDS())
df1.show(false)
df1.printSchema()
结果-

+----+----+-------+---------+----------+
|key1|key2|subkey3|timestamp|attributes|
+----+----+-------+---------+----------+
|1 |1 |0 |0 |something1|
|1 |1 |1 |0 |something2|
|1 |2 |0 |0 |something3|
|1 |2 |1 |0 |something4|
|1 |3 |0 |0 |something5|
+----+----+-------+---------+----------+

root
|-- key1: integer (nullable = true)
|-- key2: integer (nullable = true)
|-- subkey3: integer (nullable = true)
|-- timestamp: integer (nullable = true)
|-- attributes: string (nullable = true)
`dataframe-2`
val data2 =
"""
| 1 | 1 | 0 | 1 | something_new1
| 1 | 1 | 1 | 1 | something_new2
| 1 | 1 | 2 | 1 | something_new3
| 1 | 2 | 0 | 1 | something_new4
| 1 | 2 | 0 | 2 | something_even_newer
| 1 | 4 | 0 | 1 | something6
""".stripMargin

val df2 = spark.read
  .schema(schema)
  .option("sep", "|")
  .csv(data2.split(System.lineSeparator()).map(_.replaceAll("\\s*", "")).toSeq.toDS())

df2.show(false)
结果-

+----+----+-------+---------+--------------------+
|key1|key2|subkey3|timestamp|attributes |
+----+----+-------+---------+--------------------+
|1 |1 |0 |1 |something_new1 |
|1 |1 |1 |1 |something_new2 |
|1 |1 |2 |1 |something_new3 |
|1 |2 |0 |1 |something_new4 |
|1 |2 |0 |2 |something_even_newer|
|1 |4 |0 |1 |something6 |
+----+----+-------+---------+--------------------+


## 方法1(推荐)

创建 `struct<timestamp, attribute, subkey3>` 分组依据 `key1` 以及 `key2` 并使用“收集所有结构元素”
得到 `max(struct<timestamp, attribute, subkey3>)` 获取单个元素

val processedDf1 = df1.unionByName(df2)
.withColumn("timestamp_attributes", struct("timestamp", "attributes", "subkey3"))
.groupBy("key1", "key2")
.agg(max("timestamp_attributes").as("attributes"))

processedDf1.show(false)

processedDf1.selectExpr("key1", "key2",
  "attributes.subkey3 as subkey3", "attributes.timestamp as timestamp", "attributes.attributes as attributes")
  .show(false)
结果-

+----+----+----------------------------+
|key1|key2|attributes |
+----+----+----------------------------+
|1 |1 |[1, something_new3, 2] |
|1 |2 |[2, something_even_newer, 0]|
|1 |3 |[0, something5, 0] |
|1 |4 |[1, something6, 0] |
+----+----+----------------------------+

+----+----+-------+---------+--------------------+
|key1|key2|subkey3|timestamp|attributes |
+----+----+-------+---------+--------------------+
|1 |1 |2 |1 |something_new3 |
|1 |2 |0 |2 |something_even_newer|
|1 |3 |0 |0 |something5 |
|1 |4 |0 |1 |something6 |
+----+----+-------+---------+--------------------+


## 方法-2

#### 按时间戳查找每个组订单的最新值

创建 `struct<timestamp, attribute, subkey3>` 分组依据 `key1` 以及 `key2` 并使用“收集所有结构元素”
按时间戳对元素排序并获取最新的元素
从结构中选择单个字段

val processedDf = df1.unionByName(df2)
.withColumn("timestamp_attributes", struct("timestamp", "attributes", "subkey3"))
.groupBy("key1", "key2")
.agg(collect_list("timestamp_attributes").as("attributes"))
.withColumn("attributes", element_at(array_sort(col("attributes")), size(col("attributes"))))

processedDf.selectExpr("key1", "key2",
  "attributes.subkey3 as subkey3", "attributes.timestamp as timestamp", "attributes.attributes as attributes")
  .show(false)
结果-

+----+----+-------+---------+--------------------+
|key1|key2|subkey3|timestamp|attributes |
+----+----+-------+---------+--------------------+
|1 |1 |2 |1 |something_new3 |
|1 |2 |0 |2 |something_even_newer|
|1 |3 |0 |0 |something5 |
|1 |4 |0 |1 |something6 |
+----+----+-------+---------+--------------------+

请注意,如果按键分组有大量属性,那么我建议不要使用 `collect_list` 因为执行者可能会面临资源短缺

相关问题