pyspark如何创建列并在存在滚动日期时间记录时填充true/false

z4bn682m  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(272)

数据集包含有每日记录的产品,但有时它会丢失,所以我想创建额外的列来显示它在过去几天中是否存在
我有以下情况
创建t-1、t-2等列并用下面的内容填充
如果记录存在,则用1填充t-1,否则为零
原始表格:

Item    Cat    DateTime    Value
 A      C1     1-1-2021     10
 A      C1     2-1-2021     10
 A      C1     3-1-2021     10
 A      C1     4-1-2021     10
 A      C1     5-1-2021     10
 A      C1     6-1-2021     10
 B      C1     1-1-2021     20
 B      C1     4-1-2021     20

预期结果:

Item    Cat    DateTime    Value   T-1   T-2  T-3    T-4    T-5
 A       C1     1-1-2021     10     0     0    0       0     0
 A       C1     2-1-2021     10     1     0    0       0     0      (T-1 is 1 as we have 1-1-2021 record)
 A       C1     3-1-2021     10     1     1    0       0     0
 A       C1     4-1-2021     10     1     1    1       0     0
 A       C1     5-1-2021     10     1     1    1       1     0
 A       C1     6-1-2021     10     1     1    1       1     1
 B       C1     1-1-2021     20     0     0    0       0     0
 B       C1     2-1-2021      0     1     0    0       0     0      (2-1-2021 record need to be created with value zero since we miss this from original data-set, plus T-1 is  1 as we have this record from original data-set)
 B       C1     3-1-2021      0     0     1    0       0     0
 B       C1     4-1-2021     20     0     0    1       0     0
 B       C1     5-1-2021      0     1     0    0       1     0
y3bcpkx1

y3bcpkx11#

假设原始表数据存储在 original_data ,我们可以
使用名为sparksql的 daily_records 生成可能的日期。这是通过从数据集中识别最小和最大日期之间的天数来完成的,然后使用表生成函数生成可能的日期 explode 以及 spaces 生成所有可能的 item , date 记录
将这些记录与实际数据连接起来,就有了一个包含值的完整数据集
使用sparksql查询视图,并使用左连接和 CASE 声明


# Step 1

original_data.createOrReplaceTempView("daily_records")

# Step 2-4

daily_records = sparkSession.sql("""
WITH date_bounds AS (
   SELECT min(DateTime) as mindate, max(DateTime) as maxdate FROM daily_records
),
possible_dates AS (
   SELECT 
       date_add(mindate,index.pos) as DateTime 
   FROM 
       date_bounds
   lateral view posexplode(split(space(datediff(maxdate,mindate)),"")) index
),
unique_items AS (
   SELECT DISTINCT Item, Cat from daily_records
),
possible__item_dates AS (
   SELECT Item, Cat, DateTime FROM unique_items INNER JOIN possible_dates ON 1=1
),
possible_records AS (
   SELECT 
      p.Item,
      p.Cat,
      p.DateTime,
      r.Value
    FROM
      possible__item_dates p
    LEFT JOIN
      daily_records r on p.Item = r.Item and p.DateTime = r.DateTime

)
select * from possible_records

""")
daily_records.createOrReplaceTempView("daily_records")
daily_records.show()

# Step 5 - store results in desired_result

# This is optional, but I have chosen to generate the sql to create this dataframe

periods = 5 # Number of periods to check for
period_columns = ",".join(["""
    CASE
        WHEN t{0}.Value IS NULL THEN 0
        ELSE 1
    END as `T-{0}`
""".format(i) for i in range(1,periods+1)])
period_joins = " ".join(["""
    LEFT JOIN
        daily_records t{0} on datediff(to_date(t.DateTime),to_date(t{0}.DateTime))={0} and t.Item = t{0}.Item
""".format(i) for i in range(1,periods+1)])

period_sql = """
SELECT
    t.*
    {0}
FROM
    daily_records t
 {1}
ORDER BY
   Item, DateTime
""".format(
    "" if len(period_columns)==0 else ",{0}".format(period_columns),
    period_joins
)
desired_result= sparkSession.sql(period_sql)
desired_result.show()

实际 SQL 生成:

SELECT
    t.*,
    CASE
        WHEN t1.Value IS NULL THEN 0
        ELSE 1
    END as `T-1`,
    CASE
        WHEN t2.Value IS NULL THEN 0
        ELSE 1
    END as `T-2`,
    CASE
        WHEN t3.Value IS NULL THEN 0
        ELSE 1
    END as `T-3`,
    CASE
        WHEN t4.Value IS NULL THEN 0
        ELSE 1
    END as `T-4`,
    CASE
        WHEN t5.Value IS NULL THEN 0
        ELSE 1
    END as `T-5`
FROM
    daily_records t

    LEFT JOIN
        daily_records t1 on datediff(to_date(t.DateTime),to_date(t1.DateTime))=1 and t.Item = t1.Item

    LEFT JOIN
        daily_records t2 on datediff(to_date(t.DateTime),to_date(t2.DateTime))=2 and t.Item = t2.Item

    LEFT JOIN
        daily_records t3 on datediff(to_date(t.DateTime),to_date(t3.DateTime))=3 and t.Item = t3.Item

    LEFT JOIN
        daily_records t4 on datediff(to_date(t.DateTime),to_date(t4.DateTime))=4 and t.Item = t4.Item

    LEFT JOIN
        daily_records t5 on datediff(to_date(t.DateTime),to_date(t5.DateTime))=5 and t.Item = t5.Item

ORDER BY
   Item, DateTime

注意。 to_date 是可选的,如果 DateTime 已格式化为 date 字段或格式 yyyy-mm-dd

相关问题