pyspark 如何过滤数据集,使其不包含拾取和丢弃时间超出范围的数据

plicqrtu  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(125)

我有一个NYC出租车数据的数据集,我试图过滤它。这个数据集的模式如下:

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)

我想过滤数据集,以便排除任何上下班时间不在上午9点到下午5点之间的条目。但是我在编写withColumn函数中使用的helper方法时遇到了麻烦。下面是我为withColumn调用所做的工作:

from pyspark.sql.window import Window
from pyspark.sql import functions as fun

taxi_raw.withColumn("pickup_datetime", remove_pickup_times(fun.col("pickup_datetime"))
taxi_raw.withColumn("dropoff_datetime", remove_dropoff_times(fun.col("dropoff_datetime"))

下面是到目前为止我为helper方法所做的工作:

import datetime

def remove_pickup_times(pickup_datetime):
    time_start = datetime.time(9,0,0)
    time_end = datetime.time(17,0,0)
    if(pickup_datetime.time() >= time_start and pickup_datetime.time() <= time_end):
        //insert code to remove entry from dataset

def remove_dropoff_times(dropoff_datetime):
        time_start = datetime.time(9,0,0)
        time_end = datetime.time(17,0,0)
        if(dropoff_datetime.time() >= time_start and dropoff_datetime.time() <= time_end):
            //insert code to remove entry from dataset
9lowa7mx

9lowa7mx1#

您可以使用原生Spark函数。date_format在提供“HH:mm:ss”格式时将仅提取时间。
输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('2020-01-01 08:08:08', '2020-01-01 09:09:09'),
     ('2020-01-01 16:08:08', '2020-01-01 17:09:09'),
     ('2020-01-01 16:08:08', '2020-01-01 16:09:09'),
     ('2020-01-01 20:08:08', '2020-01-01 20:09:09')],
    ['pickup_datetime', 'dropoff_datetime'])

工作时间以外出差的脚本:

start = F.date_format('pickup_datetime', 'HH:mm:ss')
end = F.date_format('dropoff_datetime', 'HH:mm:ss')
df = df.filter((end < '09:00:00') | (start > '17:00:00'))

df.show()

# +-------------------+-------------------+

# |    pickup_datetime|   dropoff_datetime|

# +-------------------+-------------------+

# |2020-01-01 20:08:08|2020-01-01 20:09:09|

# +-------------------+-------------------+

工作时间内出差的脚本:

start = F.date_format('pickup_datetime', 'HH:mm:ss')
end = F.date_format('dropoff_datetime', 'HH:mm:ss')
df = df.filter((start >= '09:00:00') & (end <= '17:00:00'))

df.show()

# +-------------------+-------------------+

# |    pickup_datetime|   dropoff_datetime|

# +-------------------+-------------------+

# |2020-01-01 16:08:08|2020-01-01 16:09:09|

# +-------------------+-------------------+
ifmq2ha2

ifmq2ha22#

Spark在内部将时间戳存储为Epoch Milliseconds,因此可以将时间戳转换为long并进行检查。
尝试使用此spark.sql

spark.sql(s"""
with t1 ( select '2020-01-01 08:08:08' pickup , '2020-01-01 09:09:09' drop union all 
          select '2020-01-01 16:08:08', '2020-01-01 17:09:09' union all
          select '2020-01-01 16:08:08', '2020-01-01 16:09:09' union all
          select '2020-01-01 20:08:08', '2020-01-01 20:09:09' ) ,
    t2 ( select to_timestamp(pickup) pickup, to_timestamp(drop) drop  from t1  )
    select pickup, drop  from t2
     where (
             cast(drop as long) < cast(date_trunc('DAY',pickup) as long) + (9*60*60) --  09:00:00
             or
             cast(pickup as long) > cast(date_trunc('DAY',pickup) as long) + (17*60*60) --  17:00:00
            ) 
""").show(false)

+-------------------+-------------------+
|pickup             |drop               |
+-------------------+-------------------+
|2020-01-01 20:08:08|2020-01-01 20:09:09|
+-------------------+-------------------+

相关问题