dask读取parquet并指定模式

vnjpjtjt  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(457)

在读取Parquet文件时,是否有一个dask等价于spark指定模式的能力?可能使用传递给pyarrow的kwargs?
我有一堆Parquet文件在一个桶,但有些领域有轻微不一致的名称。我可以创建一个定制的延迟函数来处理这些情况,但我希望在通过全局化打开它们时可以指定模式。也许不像我想的那样,那么通过全球化开放将尝试连接它们。由于字段名不一致,此操作当前失败。
创建Parquet文件:

import dask.dataframe as dd

df = dd.demo.make_timeseries(
    start="2000-01-01",
    end="2000-01-03",
    dtypes={"id": int, "z": int},
    freq="1h",
    partition_freq="24h",
)

df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)

通过dask读入并在读入后指定模式:

df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})

通过spark读入并指定模式:

from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()

schema = T.StructType(
    [
        T.StructField("id", T.IntegerType()),
        T.StructField("a", T.FloatType()),
        T.StructField("timestamp", T.TimestampType()),
    ]
)

df = spark.read.format("parquet").schema(schema).load("df.parquet")
c3frrgcw

c3frrgcw1#

一些选项包括:
指定数据类型(加载后):

custom_dtypes = {"a": float, "id": int, "timestamp": pd.datetime}
df = dd.read_parquet("df.parquet", engine="pyarrow").astype(custom_dtypes)

由于字段名不一致,此操作当前失败。
如果文件中的列名不相同,则可能需要使用自定义列名 delayed 加载前:

@delayed
def custom_load(path):
   df = pd.read_parquet(path)
   # some logic to ensure consistent columns
   if "z" in df.columns:
      df = df.rename(columns={"z": "a"}).astype(custom_dtypes)
   return df

dask_df = dd.from_delayed([custom_load(path) for path in glob.glob("some_path/*parquet")])

相关问题