我想用 badrecordspath
在spark in in azure databricks中获取与作业相关的损坏记录的计数,但没有简单的方法可以知道:
如果文件已写入
文件已写入哪个分区
我想也许我可以用这样的代码检查最后一个分区是不是在最后60秒内创建的:
from datetime import datetime, timedelta
import time
import datetime
df = spark.read.format('csv').option("badRecordsPath", corrupt_record_path)
partition_dict = {} #here the dictionnary of partition and corresponding timestamp
for i in dbutils.fs.ls(corrupt_record_path):
partition_dict[i.name[:-1]]=time.mktime(datetime.datetime.strptime(i.name[:-1], "%Y%m%dT%H%M%S").timetuple())
# here i get the timestamp of one minute ago
submit_timestamp_utc_minus_minute = datetime.datetime.now().replace(tzinfo = timezone.utc) - timedelta(seconds=60)
submit_timestamp_utc_minus_minute = time.mktime(datetime.datetime.strptime(submit_timestamp_utc_minus_minute.strftime("%Y%m%dT%H%M%S"), "%Y%m%dT%H%M%S").timetuple())
# Here i compare the latest partition to check if is more recent than 60 seconds ago
if partition_dict[max(partition_dict, key=lambda k: partition_dict[k])]>submit_timestamp_utc_minus_minute :
corrupt_dataframe = spark.read.format('json').load(corrupt_record_path+partition+'/bad_records')
corrupt_records_count = corrupt_dataframe.count()
else:
corrupt_records_count = 0
但我看到两个问题:
这是一个很大的开销(好的代码也可以写得更好,但仍然)
我甚至不确定在读取作业中何时定义分区名称。是在工作的开始还是结束?如果是在一开始,那么60秒根本就不相关。
顺便说一句,我不能对corrupt\u records\u列使用permissive read,因为我不想缓存Dataframe(您可以在这里看到我的另一个问题)
任何建议或意见将不胜感激!
暂无答案!
目前还没有任何答案,快来回答吧!