表1包含所有员工信息的历史记录,但每隔90天才捕获一次数据。表2包含所有员工的当前信息,每周更新一次时间戳。每隔90天,表2会追加表1。我认为,通过将表1中的时间戳添加90天,并将其与表2中的时间戳进行比较,我可以使用下面的逻辑来执行append,但我得到了一个错误... TypeError:'DataFrame'和'DataFrame'的示例之间不支持'〈'我是否遗漏了什么?
# Let's say the max date in table 1 is 2023-01-15. Adding 90 days would put us on 2023-04-15
futr_date = spark.sql('SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one')
# Checking the date in the weekly refresh table, i have a timestamp of 2023-02-03
curr_date = spark.sql('SELECT DISTINCT tm_update AS current_date FROM tbl_two')
if curr_date > futr_date:
print('execute block of code that transforms table 2 data and append to table 1')
else:
print('ignore and check again next week')
1条答案
按热度按时间juzqafwq1#
选择语句没有返回值,而是返回 Dataframe ,这就是为什么你会得到错误。如果你想得到值,你需要收集
在第二个sql中,你使用distinct来获取日期,它可能会返回值列表,我不确定这是否是你想要的。也许这里你应该使用MIN?如果只有一个ts值,它可能不重要,但是如果有更多的值,可能会导致一些问题
正如我所说,我不知道你的逻辑是否正确,但这里是工作的例子,你可以使用进一步的变化
产出