python 在给定日期范围的PySpark数据框中,是否可以迭代日期?

3phpmpom  于 2022-12-02  发布在  Python
关注(0)|答案(1)|浏览(149)

我不确定我的方法是否正确,但我尝试查看是否可以输出多个dates对象,例如dates_1, dates_2, dates_3,或者甚至是一个数组,如果可以的话,每个对象有7天-例如dates_1 = ("2022-08-20", "2022-08-27")dates_2 = ("2022-08-28", "2022-09-04")dates_3 = ("2022-09-05", "2022-09-12")等等。到目前为止,我已经手动输入了日期范围:

from pyspark.sql.functions import to_date
from pyspark.sql.functions import col,lit
import datetime
from pyspark.sql.types import*

dates = ("2022-09-18",  "2022-09-25")  # Manually input date range of 7 days

date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates] 

New = NewSpark.where((NewSpark.datetype >= date_from) & (NewSpark.datetype <= date_to))
New = New.orderBy(asc("datetype"))

New.show(10)

输出日期类型日期范围为7天的 Dataframe (“2022-09-18”-“2022-09-25”)

+----------+-------------------+------------------+
|  datetype|            user_id|         course_id|
+----------+-------------------+------------------+
|2022-09-20|-436631365164282159|193340000000013715|
|2022-09-20| 280876762290791430|193340000000016827|
|2022-09-20| 445049443233666362|                \N|
|2022-09-20|-322720029403815673|193340000000016900|
|2022-09-20| 491178362284543871|                \N|
|2022-09-20|  13092183588131224|                \N|
|2022-09-20|                 \N|                \N|
|2022-09-20|-367908093210940595|                \N|
|2022-09-20|  72800797765911039|193340000000014279|
|2022-09-20| -14158652536236447|193340000000013898|
+----------+-------------------+------------------+
only showing top 10 rows

我有一个完整的PySpark数据框架(NewSpark):

+----------+-------------------+------------------+
|  datetype|            user_id|         course_id|    
+----------+-------------------+------------------+
|2022-09-15| 425465600693903129|                \N|
|2022-09-15| 508873040735657962|193340000000014379|
|2022-09-15| 284347190388427414|193340000000014966|
|2022-09-15|-486512951318998054|193340000000018519|
|2022-09-15| 125529631549145496|                \N|
|2022-09-15| 125529631549145496|                \N|
|2022-09-15| 557089411379160781|193340000000016522|
|2022-09-15| 522439159932067624|                \N|
|2022-09-15|-405858644089907758|                \N|
|2022-09-15|-260152078780427420|                \N|
+----------+-------------------+------------------+
only showing top 10 rows

具有以下日期:

+----------+------+
|  datetype| count|
+----------+------+
|2022-09-21|  1498|
|2022-09-20|305696|
|2022-09-16|  1668|
|2022-09-15|998332|
|2022-09-11|  2345|
|2022-09-10|997655|
|2022-09-05|  6895|
|2022-09-04|993101|
|2022-09-03|     4|
|2022-08-28|  3093|
|2022-08-27|997945|
|2022-08-26|998962|
|2022-08-25|  2493|
|2022-08-24|997507|
|2022-08-19|  2613|
|2022-08-18|999524|
|2022-08-17|997863|
+----------+------+

有没有办法在不手动输入日期范围的情况下完成上面的代码?或者有没有更好的方法?
可能会使用以下内容?:

from datetime import timedelta, date

def daterange(date1, date2):
    for n in range(int ((date2 - date1).days)):
        yield date1 + timedelta(n*7)

start_dt = date(2022, 8, 20)
end_dt = date(2022, 12, 3)
for dt in daterange(start_dt, end_dt):
    print(dt.strftime("%Y-%m-%d"))
eoxn13cs

eoxn13cs1#

若要输出范围为7天的多个日期对象,可以使用循环生成日期范围,然后对每个范围的DataFrame应用相同的筛选逻辑。下面是一个示例,说明如何执行此操作:

from pyspark.sql.functions import col, to_date, asc
from pyspark.sql.types import TimestampType
import datetime

# Start and end dates for the date range
start_date = "2022-01-01"
end_date = "2022-12-31"

# Create a Spark session
spark = SparkSession.builder.appName("Date Iteration").getOrCreate()

# Create a DataFrame with a single column containing the date range
date_range_df = spark.range(start_date, end_date) \
                     .withColumn("date", to_date(col("id")))

# Create a list to store the DataFrames for each date range
date_ranges = []

# Iterate through the dates in the DataFrame and filter the data for each range
for date in date_range_df.select("date").collect():
    # Calculate the start and end dates for the range
    date_from = date.date
    date_to = date_from + datetime.timedelta(days=7)
    
    # Filter the DataFrame for the current date range
    df = NewSpark.where((NewSpark.datetype >= date_from) & (NewSpark.datetype <= date_to)) \
                 .orderBy(asc("datetype"))
                 
    # Add the filtered DataFrame to the list
    date_ranges.append(df)

# Print the DataFrame for each date range
for i, df in enumerate(date_ranges):
    print(f"Date range {i+1}:")
    df.show()

在本例中,我们使用for循环遍历DataFrame中的日期,并计算每个7天范围的开始日期和结束日期。然后,我们使用where()和orderBy()函数筛选每个范围的原始DataFrame,并按datetype列对结果进行排序。最后,我们打印每个日期范围的结果DataFrame。
我希望这对你有帮助!

相关问题