Pyspark比较来自两个 Dataframe 的日期If语句

o4tp2gmn  于 2023-02-07  发布在  Spark
关注(0)|答案(1)|浏览(161)

表1包含所有员工信息的历史记录,但每隔90天才捕获一次数据。表2包含所有员工的当前信息,每周更新一次时间戳。每隔90天,表2会追加表1。我认为,通过将表1中的时间戳添加90天,并将其与表2中的时间戳进行比较,我可以使用下面的逻辑来执行append,但我得到了一个错误... TypeError:'DataFrame'和'DataFrame'的示例之间不支持'〈'我是否遗漏了什么?

# Let's say the max date in table 1 is 2023-01-15. Adding 90 days would put us on 2023-04-15
futr_date = spark.sql('SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one')

# Checking the date in the weekly refresh table, i have a timestamp of 2023-02-03
curr_date = spark.sql('SELECT DISTINCT tm_update AS current_date FROM tbl_two')

if curr_date > futr_date:
  print('execute block of code that transforms table 2 data and append to table 1')
else:
  print('ignore and check again next week')
juzqafwq

juzqafwq1#

选择语句没有返回值,而是返回 Dataframe ,这就是为什么你会得到错误。如果你想得到值,你需要收集

futr_date  = spark.sql('SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one').collect()[0]

在第二个sql中,你使用distinct来获取日期,它可能会返回值列表,我不确定这是否是你想要的。也许这里你应该使用MIN?如果只有一个ts值,它可能不重要,但是如果有更多的值,可能会导致一些问题
正如我所说,我不知道你的逻辑是否正确,但这里是工作的例子,你可以使用进一步的变化

import time
import pyspark.sql.functions as F

historicalData = [
    (1, time.mktime(time.strptime("24/10/2022", "%d/%m/%Y"))),
    (2, time.mktime(time.strptime("15/01/2023", "%d/%m/%Y"))),
    (3, time.mktime(time.strptime("04/11/2022", "%d/%m/%Y"))),
]
currentData = [
    (1, time.mktime(time.strptime("01/02/2023", "%d/%m/%Y"))),
    (2, time.mktime(time.strptime("02/02/2023", "%d/%m/%Y"))),
    (3, time.mktime(time.strptime("03/02/2023", "%d/%m/%Y"))),
]

oldDf = spark.createDataFrame(historicalData, schema=["id", "tm_update"]).withColumn(
    "tm_update", F.to_timestamp("tm_update")
)
oldDf.createOrReplaceTempView("tbl_one")
currentDf = spark.createDataFrame(currentData, schema=["id", "tm_update"]).withColumn(
    "tm_update", F.to_timestamp("tm_update")
)
currentDf.createOrReplaceTempView("tbl_two")

futr_date = spark.sql(
    "SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one"
).collect()[0]
curr_date = spark.sql(
    "SELECT cast(MIN(tm_update) as date) AS current_date FROM tbl_two"
).collect()[0]

print(futr_date)
print(curr_date)

if curr_date > futr_date:
    print("execute block of code that transforms table 2 data and append to table 1")
else:
    print("ignore and check again next week")

产出

Row(future_date=datetime.date(2023, 4, 15))
Row(current_date=datetime.date(2023, 2, 3))
ignore and check again next week

相关问题