通过使用时间列和持续时间变量随机选择行数的Pyspark聚合

tcbh2hod  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(97)

在pyspark中,我有一个类似于以下示例的框架:

id, execution_time, sym, qty
========================================
1, 2023-10-27 15:01:24.2200, aa1, 100
2, 2023-10-27 15:15:21.2200, aa1, 250
3, 2023-10-27 15:27:24.2200, aa2, 350
4, 2023-10-27 15:35:25.2200, aa3, 400
5, 2023-10-27 16:00.25.2200, aa3, 500
6, 2023-10-27 16:15:24.2200, aa4, 100
7, 2023-10-27 16:55:24.2200, aa1, 100
8, 2023-10-27 16:50:24.2200, aa2, 100
========================================

字符串
现在我的要求是:我有一个'duration'变量,这个变量的值是30 #分钟现在从第一行开始,我需要应用duration变量的值,然后我需要像下面这样对这些行进行分组-所以,在这个样本数据中,在应用'duration'变量之后,我应该可以分组到第三行。因为第四行的时间大于第一行+持续时间。(我们在第一行应用了持续时间)
现在我需要再次从第4行开始并应用duration变量,这次我们应该只对第4行和第5行进行分组,因为第6行的时间大于第4行+ duration。
现在我需要再次从第6行开始并应用持续时间变量,这次我们应该只对第6行进行分组,因为第7行的时间大于第6行+持续时间。
换句话说:因此,在对一行的time列应用duration之后(假设这是我们的结果),我们需要选择下一行的time > result的所有即将到来的行,然后选择下一行并应用duration。
是否可以标记所有这些行,并将其存储在一个新列中,这福尔斯符合上述条件?因为稍后我需要进行聚合。

y53ybaqx

y53ybaqx1#

我试着用简单的方式做这件事。
1.首先创建一个Window spec,这样我们就可以收集所有属于window_spec的id。
1.获取这些收集的id列表的计数并将其存储在列中。
1.提取出id并计数到一个单独的列表中进行顺序迭代处理,因为它很复杂,不能直接在Window函数中实现。
1.根据需要处理提取的元组列表。
1.从上面处理的列表中创建一个框架。
下面是python脚本:

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

[1, "2023-10-27 15:01:24.2200", "aa1", 100],
[2, "2023-10-27 15:15:21.2200", "aa1", 250],
[3, "2023-10-27 15:27:24.2200", "aa2", 350],
[4, "2023-10-27 15:35:25.2200", "aa3", 400],
[5, "2023-10-27 16:00:25.2200", "aa3", 500],
[6, "2023-10-27 16:15:24.2200", "aa4", 100],
[7, "2023-10-27 16:55:24.2200", "aa1", 100],
[8, "2023-10-27 16:50:24.2200", "aa2", 100],

]

columns1 =["id", "execution_time", "sym", "qty"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns1)

df1 = df1.withColumn("execution_time", F.to_timestamp(F.col("execution_time")))
df1 = df1.withColumn("unix_timestamp", F.unix_timestamp("execution_time"))

df1.show(n=20, truncate=False)

print("dataframe df1 schema")
print(df1.schema)

window_spec = (
    Window
    .orderBy(F.col("unix_timestamp"))
    .rangeBetween(0, 30*60 ) # duration in seconds
)

calculated_columns_df = df1.withColumn("collected_ids", F.collect_list("id").over(window_spec))
calculated_columns_df = calculated_columns_df.withColumn("count_of_ids", F.count("collected_ids").over(window_spec))

print("Calculated Dataframe : Window Aggregation Function Applied")
calculated_columns_df.show(n=100, truncate=False)

collect_id_maxid_pairs = calculated_columns_df.select("id", "unix_timestamp", "count_of_ids").orderBy("unix_timestamp").rdd.map(lambda ele : (int(ele["id"]), int(ele["count_of_ids"]))).collect()

id_list_only = [ele[0] for ele in collect_id_maxid_pairs]
count_list_only = [ele[1] for ele in collect_id_maxid_pairs]

grouped_ids = []

iter_over_count_list =  iter(count_list_only)

try:
    while True:
        element = next(iter_over_count_list)
        print(f"accepted count = {element}")
        grouping_value = id_list_only[: element]
        grouped_ids.append((id_list_only[0], grouping_value))
        id_list_only = id_list_only[element : ]

        for idx in range(element-1):
            discarded_count = next(iter_over_count_list)
            print(f"discarded count = {discarded_count}")

except StopIteration:
    print("Reached the end of the iterator.")

auxilary_df = sqlContext.createDataFrame(grouped_ids, ["id", "ids_within_group"])

print("auxilary_df : ")
auxilary_df.show(n=100, truncate=False)

字符串
输出量:

+---+----------------------+---+---+--------------+
|id |execution_time        |sym|qty|unix_timestamp|
+---+----------------------+---+---+--------------+
|1  |2023-10-27 15:01:24.22|aa1|100|1698399084    |
|2  |2023-10-27 15:15:21.22|aa1|250|1698399921    |
|3  |2023-10-27 15:27:24.22|aa2|350|1698400644    |
|4  |2023-10-27 15:35:25.22|aa3|400|1698401125    |
|5  |2023-10-27 16:00:25.22|aa3|500|1698402625    |
|6  |2023-10-27 16:15:24.22|aa4|100|1698403524    |
|7  |2023-10-27 16:55:24.22|aa1|100|1698405924    |
|8  |2023-10-27 16:50:24.22|aa2|100|1698405624    |
+---+----------------------+---+---+--------------+

dataframe df1 schema
StructType([StructField('id', LongType(), True), StructField('execution_time', TimestampType(), True), StructField('sym', StringType(), True), StructField('qty', LongType(), True), StructField('unix_timestamp', LongType(), True)])
Calculated Dataframe : Window Aggregation Function Applied

+---+----------------------+---+---+--------------+-------------+------------+
|id |execution_time        |sym|qty|unix_timestamp|collected_ids|count_of_ids|
+---+----------------------+---+---+--------------+-------------+------------+
|1  |2023-10-27 15:01:24.22|aa1|100|1698399084    |[1, 2, 3]    |3           |
|2  |2023-10-27 15:15:21.22|aa1|250|1698399921    |[2, 3, 4]    |3           |
|3  |2023-10-27 15:27:24.22|aa2|350|1698400644    |[3, 4]       |2           |
|4  |2023-10-27 15:35:25.22|aa3|400|1698401125    |[4, 5]       |2           |
|5  |2023-10-27 16:00:25.22|aa3|500|1698402625    |[5, 6]       |2           |
|6  |2023-10-27 16:15:24.22|aa4|100|1698403524    |[6]          |1           |
|8  |2023-10-27 16:50:24.22|aa2|100|1698405624    |[8, 7]       |2           |
|7  |2023-10-27 16:55:24.22|aa1|100|1698405924    |[7]          |1           |
+---+----------------------+---+---+--------------+-------------+------------+

accepted count = 3
discarded count = 3
discarded count = 2
accepted count = 2
discarded count = 2
accepted count = 1
accepted count = 2
discarded count = 1
Reached the end of the iterator.
 auxilary_df : 
+---+----------------+
|id |ids_within_group|
+---+----------------+
|1  |[1, 2, 3]       |
|4  |[4, 5]          |
|6  |[6]             |
|8  |[8, 7]          |
+---+----------------+

k3fezbri

k3fezbri2#

你可以收集所有的日期行作为一个结构体列表,然后使用aggregate函数来匹配结束时间。

data_sdf. \
    groupBy(func.lit('gk')). \
    agg(func.sort_array(func.collect_list(func.struct('id', 'exec_time', 'sym', 'qty'))).alias('bork')). \
    withColumn('bork2',
               func.aggregate(func.expr('slice(bork, 2, size(bork))'),
                              func.array(func.col('bork')[0].withField('end', func.expr('bork[0].exec_time + interval 30 minutes'))),
                              lambda x, y: func.array_union(x,
                                                            func.array(y.withField('end', 
                                                                                   func.when(y.exec_time <= func.element_at(x, -1).end, func.element_at(x, -1).end).
                                                                                   otherwise(y.exec_time + func.expr('interval 30 minutes'))
                                                                                   )
                                                                       )
                                                            )
                              )
               ). \
    selectExpr('inline(bork2)'). \
    withColumn('id_grp', func.collect_list('id').over(wd.partitionBy('end'))). \
    show(truncate=False)

# +---+----------------------+---+---+----------------------+---------+
# |id |exec_time             |sym|qty|end                   |id_grp   |
# +---+----------------------+---+---+----------------------+---------+
# |1  |2023-10-27 15:01:24.22|aa1|100|2023-10-27 15:31:24.22|[1, 2, 3]|
# |2  |2023-10-27 15:15:21.22|aa1|250|2023-10-27 15:31:24.22|[1, 2, 3]|
# |3  |2023-10-27 15:27:24.22|aa2|350|2023-10-27 15:31:24.22|[1, 2, 3]|
# |4  |2023-10-27 15:35:25.22|aa3|400|2023-10-27 16:05:25.22|[4, 5]   |
# |5  |2023-10-27 16:00:25.22|aa3|500|2023-10-27 16:05:25.22|[4, 5]   |
# |6  |2023-10-27 16:15:24.22|aa4|100|2023-10-27 16:45:24.22|[6]      |
# |7  |2023-10-27 16:55:24.22|aa1|100|2023-10-27 17:25:24.22|[7, 8]   |
# |8  |2023-10-27 16:50:24.22|aa2|100|2023-10-27 17:25:24.22|[7, 8]   |
# +---+----------------------+---+---+----------------------+---------+

字符串
aggregate函数的工作方式类似于python的reduce。它递归地将逻辑应用于数组的元素。在这种情况下,我不断地用exec_time检查结束日期,如果exec_time大于当前结束日期,则使用exec_time来计算结束日期。

# the arrays
data_sdf. \
    groupBy(func.lit('gk')). \
    agg(func.sort_array(func.collect_list(func.struct('id', 'exec_time'))).alias('bork')). \
    withColumn('bork2',
               func.aggregate(func.expr('slice(bork, 2, size(bork))'),
                              func.array(func.col('bork')[0].withField('end', func.expr('bork[0].exec_time + interval 30 minutes'))),
                              lambda x, y: func.array_union(x,
                                                            func.array(y.withField('end', 
                                                                                   func.when(y.exec_time <= func.element_at(x, -1).end, func.element_at(x, -1).end).
                                                                                   otherwise(y.exec_time + func.expr('interval 30 minutes'))
                                                                                   )
                                                                       )
                                                            )
                              )
               ). \
    show(truncate=False)

# +---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |gk |bork                                                                                                                                                                                                                                    |bork2                                                                                                                                                                                                                                                                                                                                                                                                                                   |
# +---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |gk |[{1, 2023-10-27 15:01:24.22}, {2, 2023-10-27 15:15:21.22}, {3, 2023-10-27 15:27:24.22}, {4, 2023-10-27 15:35:25.22}, {5, 2023-10-27 16:00:25.22}, {6, 2023-10-27 16:15:24.22}, {7, 2023-10-27 16:55:24.22}, {8, 2023-10-27 16:50:24.22}]|[{1, 2023-10-27 15:01:24.22, 2023-10-27 15:31:24.22}, {2, 2023-10-27 15:15:21.22, 2023-10-27 15:31:24.22}, {3, 2023-10-27 15:27:24.22, 2023-10-27 15:31:24.22}, {4, 2023-10-27 15:35:25.22, 2023-10-27 16:05:25.22}, {5, 2023-10-27 16:00:25.22, 2023-10-27 16:05:25.22}, {6, 2023-10-27 16:15:24.22, 2023-10-27 16:45:24.22}, {7, 2023-10-27 16:55:24.22, 2023-10-27 17:25:24.22}, {8, 2023-10-27 16:50:24.22, 2023-10-27 17:25:24.22}]|
# +---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

# root
#  |-- gk: string (nullable = false)
#  |-- bork: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- id: long (nullable = true)
#  |    |    |-- exec_time: timestamp (nullable = true)
#  |-- bork2: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- id: long (nullable = true)
#  |    |    |-- exec_time: timestamp (nullable = true)
#  |    |    |-- end: timestamp (nullable = true)

相关问题