基于Pyspark中的多列从dataframe中删除重复行

kdfy810k  于 2023-08-03  发布在  Spark
关注(0)|答案(1)|浏览(132)

如果我有一个事件表:
| 单位|状态1|状态;状态| state n |
| --|--|--| ------------ |
| 一个|x轴|n_1| n_1 |
| 一个|x轴|氮气| n_2 |
| 一个|y| n_3| n_3 |
| 一个|x轴|四氮| n_4 |
| B| x轴|n_5| n_5 |
假设我们希望仅基于状态1按单元保留事件的第一次发生。这意味着02:00时的观察结果不相关,我们希望将其删除。
删除重复项是常见的,但如果我们试图获取状态1的不同值,我们也会删除04:00的事件并获取此表。
| 单位|状态1|状态;状态| state n |
| --|--|--| ------------ |
| 一个|x轴|n_1| n_1 |
| 一个|y| n_3| n_3 |
| B| x轴|n_5| n_5 |
我们仍然希望将值保持在04:00,以避免丢失状态更改的信息,因此本质上我们不希望删除重复项,而是希望删除重复行。
此外,我们还需要考虑n>=2状态的情况。
如何根据几列删除/过滤掉包含重复数据的行。

3phpmpom

3phpmpom1#

我对此的解决方案是将所有状态的列作为一个结构体添加,并使用lag/lead检查前面的结构体是否相同。

windowPart = partitionBy('unit').orderBy(col('timestamp'))

df = (
spark.read.table("events")
    .select(
    "*",
    struct('state 1', 'state 2', 'state 1-n', 'state n').alias('states')
    .withColumn(
    'repeating',
    lead('states').over('windowPart') == col('states')
    )
    .filter(col('repeating') == True)
    .drop('states')
)

字符串
这是可行的,但是为它创建一个单独的列结构体,然后删除它,感觉没有必要。
有没有更好的方法来做到这一点,或者我应该让执行计划的优化完成它的工作,并对这个解决方案感到满意?

相关问题