再次找到值所在的行

vlf7wbxs  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(260)

我有这样一个Dataframe:

+-----+----+----+--------+
|index|name| Num|solution|
+-----+----+----+--------+
|    0|   a|1000|    true|
|    1|   a|2000|    true|
|    2|   a| 300|   false|
|    3|   a| 400|    true|
|    4|   a|2100|    true|
|    5|   a|2200|    true|
+-----+----+----+--------+

我现在想更新我的解决方案列。如果值( Num )在第一次达到或超过“drop”(这里drop之前的值是2000)之前,我想将所有bools从“drop”开始设置为false,直到该点之后。因此,预期结果将是:

+-----+----+----+---------------+
|index|name| Num|solution_update|
+-----+----+----+---------------+
|    0|   a|1000|           true|
|    1|   a|2000|           true|
|    2|   a| 300|          false|
|    3|   a| 400|          false|
|    4|   a|2100|          false|
|    5|   a|2200|           true|
+-----+----+----+---------------+

我觉得我缺少了解决这个问题的基本思路:-
我可以在拖放前检测行中的值:

my_window = Window.partitionBy('name').orderBy(F.col('index'))

df= df.withColumn('lag1', F.lag(F.col('Num'), -1)
                    .over(my_window).cast('bigint'))
df= df.withColumn('help',
                        (F.when((F.col('lag1'))
                                    < (F.col('Num')), False)))
+-----+----+----+--------+----+-----+
|index|name| Num|solution|lag1| help|
+-----+----+----+--------+----+-----+
|    0|   a|1000|    true|2000| null|
|    1|   a|2000|    true| 300|false|
|    2|   a| 300|   false| 400| null|
|    3|   a| 400|    true|2100| null|
|    4|   a|2100|    true|2200| null|
|    5|   a|2200|    true|null| null|
+-----+----+----+--------+----+-----+

但现在我不知道如何搜索“第一个值等于或大于”比

df.where(F.col('help')==False)['Num']

有人能帮忙吗?

y1aodyip

y1aodyip1#

这可不容易。我会这样做的。希望这些列是不言自明的:)但是一定要问你是否不清楚任何列的含义。

from pyspark.sql import functions as F, Window

my_window = Window.partitionBy('name').orderBy(F.col('index'))

df2 = df.withColumn(
    'drop',
    F.when(F.col('Num') < F.lag('Num').over(my_window), F.lag('Num').over(my_window))
).withColumn(
    'num_before_drop',
    F.last('drop', ignorenulls=True).over(my_window)
).withColumn(
    'surpass',
    F.col('Num') > F.col('num_before_drop')
).withColumn(
    'first_surpass',
    F.col('surpass') & ~F.lag('surpass').over(my_window)
).withColumn(
    'solution_update',
    F.when(~F.col('surpass') | F.col('first_surpass'), F.lit(False))
     .otherwise(F.col('solution'))
)

df2.show()
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
|index|name| Num|solution|drop|num_before_drop|surpass|first_surpass|solution_update|
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
|    0|   a|1000|    true|null|           null|   null|         null|           true|
|    1|   a|2000|    true|null|           null|   null|         null|           true|
|    2|   a| 300|   false|2000|           2000|  false|        false|          false|
|    3|   a| 400|    true|null|           2000|  false|        false|          false|
|    4|   a|2100|    true|null|           2000|   true|         true|          false|
|    5|   a|2200|    true|null|           2000|   true|        false|           true|
+-----+----+----+--------+----+---------------+-------+-------------+---------------+
2vuwiymt

2vuwiymt2#

还有一种方法:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

win = Window.partitionBy('name').orderBy('index')

# Get "largest so far"

df = df.withColumn('max_num', F.max('Num').over(win))
df = df.withColumn('running_max', F.expr('max_num > Num'))

# Now lag the largest so far and form a combined indicator

df = df.withColumn('lag_running_max', F.lag('running_max').over(win)).na.fill(False)
df =  df.withColumn('combined_indicator', F.expr("running_max or lag_running_max"))

# update solution column

df = df.withColumn('updated_solution', F.when(F.col("combined_indicator"), False).otherwise(F.col("solution")))

相关问题