如果列值依赖于文件路径,那么在一次读取多个文件时,有没有一种方法可以将文本作为列添加到sparkDataframe中?

wtlkbnrh  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(486)

我试图把很多avro文件读入sparkDataframe。它们都共享相同的s3文件路径前缀,所以最初我运行的是:

path = "s3a://bucketname/data-files"
df = spark.read.format("avro").load(path)

成功识别了所有文件。
单个文件类似于:

"s3a://bucketname/data-files/timestamp=20201007123000/id=update_account/0324345431234.avro"

在试图操作数据时,代码一直出错,并显示一条消息,其中一个文件不是avro数据文件。收到的实际错误消息是: org.apache.spark.SparkException: Job aborted due to stage failure: Task 62476 in stage 44102.0 failed 4 times, most recent failure: Lost task 62476.3 in stage 44102.0 (TID 267428, 10.96.134.227, executor 9): java.io.IOException: Not an Avro data file .
为了避免这个问题,我能够得到我感兴趣的avro文件的显式文件路径。把他们列在名单上之后 (file_list) ,我成功地跑了 spark.read.format("avro").load(file_list) .
现在的问题是-我想在dataframe中添加一些字段,这些字段是filepath的一部分(即上面示例中的时间戳和id)。
当只使用bucket和prefix filepath来查找文件时(方法#1),这些字段会自动附加到结果Dataframe中。有了显式文件路径,我就没有这个优势了。
我想知道在使用spark读取文件时是否有方法包含这些列。
按顺序处理文件的方式如下:

for file in file_list:
    df = spark.read.format("avro").load(file)
    id, timestamp = parse_filename(file)
    df = df.withColumn("id", lit(id))\
         .withColumn("timestamp", lit(timestamp))

但有超过50万个文件,这将是一个永恒的过程。
我是新来的Spark,所以任何帮助将不胜感激,谢谢!

8ehkhllq

8ehkhllq1#

这里需要解决两个问题:
指定文件
spark内置了读取给定路径中特定类型的所有文件的处理。正如@sri_karthik所建议的,尝试提供一条类似 "s3a://bucketname/data-files/*.avro" (如果不起作用,可以试试 "s3a://bucketname/data-files/**/*.avro" ... 我不记得spark使用的确切的模式匹配语法(patternmatchingsyntax),它应该只获取所有avro文件,并消除在这些路径中看到非avro文件的错误。在我看来,这比手动获取文件路径并显式指定它们更优雅。
另外,您看到这种情况的原因可能是因为文件夹通常会被标记为元数据文件,如 .SUCCESS 或者 .COMPLETED 表明它们已准备好食用。
从文件路径提取元数据
如果您检查这个stackoverflow问题,它将显示如何将文件名添加为新列(scala和pyspark都是这样)。然后你可以使用 regexp_extract 函数从该文件名字符串中解析出所需的元素。我从未在spark中使用过scala,因此无法在这方面帮助您,但它应该与pyspark版本类似。

pxyaymoc

pxyaymoc2#

为什么不先使用wholetextfiles方法读取文件,然后在开始时将路径名添加到数据本身中呢。然后可以从数据中过滤出文件名,并在创建Dataframe时将其添加为列。我同意这是一个两步的过程。但它应该有用。要获取文件的时间戳,您需要js不可序列化的filesystem对象,即它不能用于sparks并行操作,因此您必须使用file和timestamp创建一个本地集合,并以某种方式将其与使用wholetextfiles创建的rdd连接起来。

相关问题