HDFS PySpark中parquet文件的读取范围

ut6juiuv  于 2023-10-14  发布在  HDFS
关注(0)|答案(3)|浏览(240)

我有大量的日常文件存储在HDFS中,其中分区以YYYY-MM-DD格式存储。
举例来说:

$ hdfs dfs -ls /my/path/here
<some stuff here> /my/path/here/cutoff_date=2023-10-02
<some stuff here> /my/path/here/cutoff_date=2023-10-03
<some stuff here> /my/path/here/cutoff_date=2023-10-04
<some stuff here> /my/path/here/cutoff_date=2023-10-05
<some stuff here> /my/path/here/cutoff_date=2023-10-06

我如何在这种结构下读取一系列日期?特别是,我需要读取2023-06-072023-10-06之间的所有可用分区。
根据this post,我可以使用sqlContext来传递一个使用[]的范围。沿着下列路线的东西:

sqlContext.read.load('/my/path/here/cutoff_date=[2023-10-02-2023-10-06]')

这显然是行不通的

t2a7ltrp

t2a7ltrp1#

由于/my/path/here/cutoff_date=[2023-10-02-2023-10-06]看起来是一个Linux shell命令,可能是/my/path/here/cutoff_date=2023-10-{02..06}

bn31dyow

bn31dyow2#

也许阅读下面需要的分区,然后联合他们,以便有一个最后的框架将做你想要的?将timedelta更改为taste。
编辑:如果某些日期数据在范围内丢失,则还创建了一个名为path_exists的函数,以便在这种情况下不会出现异常/文件未找到错误

from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import subprocess

def path_exists(hdfs_path):
    try:
        subprocess.check_output(['hdfs', 'dfs', '-ls', hdfs_path])
        return True
    except subprocess.CalledProcessError:
        return False

spark = SparkSession.builder.getOrCreate()

start_date = datetime.strptime("2023-06-07", "%Y-%m-%d")
end_date = datetime.strptime("2023-10-06", "%Y-%m-%d")
delta = timedelta(days=1)

date_list = []
while start_date <= end_date:
    date_str = start_date.strftime("%Y-%m-%d")
    date_list.append(date_str)
    start_date += delta

df_list = []
for d in date_list:
    hdfs_path = f'/my/path/here/cutoff_date={d}'
    if path_exists(hdfs_path):
        df_list.append(spark.read.load(hdfs_path))

if df_list:
    final_df = df_list[0]
    for df in df_list[1:]:
        final_df = final_df.union(df)
else:
    print("No valid partitions found in the range.")
ruyhziif

ruyhziif3#

[]是一个字符串范围,就像你在正则表达式中看到的那样。
所以对于10/2到10/6,cutoff_date=2023-10-0[2-6]
06/07 ~ 06/07,有点小问题。您可以尝试cutoff_date=2023-[0-1][0-6]-[0-3][0-9],但这将包括2023-10-07,2023-10-20等日期。因此,在阅读之后,您需要额外的过滤器。
然而,说实话,如果你是阅读有点大的数据,因为PartitionFilters,我认为阅读整个文件夹和做过滤器与阅读子集只会有一个边际差异。
参考:https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

相关问题