在不移动数据库的情况下,计算特定条件下的数据库(PySpark)

yvfmudvl  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(119)

因为我是编程新手,或者至少我是基础知识新手,所以我面临着一个问题,我不知道如何计算PySpark中的“周期”。
让我们假设这是DataFrame:
| 信|组|
| --|--|
| 一| 0 |
| 一| 0 |
| 一| 1 |
| 一| 1 |
| 一| 1 |
| 一| 0 |
| 一| 0 |
| 一| 1 |
| 一| 1 |
| 一| 0 |
| 一| 0 |
我想看到的结果是有多少“周期(两个或X #1是连续的)".在这种情况下将是:

  • 2个循环(下表显示了分开的“循环”)。

我怎样才能达到这个结果?
| 信|组|
| --|--|
| 一| 0 |
| 一| 0 |
| --|--|
| 一| 1 |
| 一| 1 |
| 一| 1 |
| --|--|
| 一| 0 |
| 一| 0 |
| --|--|
| 一| 1 |
| 一| 1 |
| --|--|
| 一| 0 |
| 一| 0 |
如果你能帮助我一个例子或一个链接,如果这是已经提到的将不胜感激。
我试了几个过滤器,但都没有成功。
先谢了。

jdzmm42g

jdzmm42g1#

您需要一种方法来保留当前的行顺序,以便对周期进行计数。为此,可以使用函数monotonically_increasing_id()为每行分配一个唯一的“递增值”,以便我们按该列进行排序,原始行序列被保留。一旦就位,我们可以使用基于窗口的计算(需要对行进行排序)来提供周期计数。
注:不确定2个连续行的要求,下面没有处理

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create SparkSession
spark = SparkSession.builder \
          .appName('CycleCount') \
          .getOrCreate()

# Assume "df" as the DataFrame
df.show()

# Create a new column "index" so that we can retain the current row order
df = df.withColumn("index", F.monotonically_increasing_id())

# Define the window ordered by the "index" which retains current row order (per letter)
window = Window.partitionBy("letter").orderBy("index")

# Add new column "change" that flags when a change in 'group' occurs
df = df.withColumn("change", F.when(F.lag("group").over(window) != F.col("group"), 1).otherwise(0))

# Use cumsum() for cumulative sum of these changes, which will serve as an identifier for each cycle
df = df.withColumn("cycle", F.sum("change").over(window))

# Filter out groups with only 1 row
df = df.filter(F.col("count") > 1)

# Count the number of cycles
cycle_count = df.groupBy("letter", "cycle").count().count()

print("Number of cycles: ", cycle_count)

字符串

相关问题