为什么pyspark中的S3目录源的input_file_name()为空?

ma8fv8wu  于 2023-04-29  发布在  Spark
关注(0)|答案(8)|浏览(143)

我试图获取通过AWS Glue中的S3数据目录加载的每个文件的输入文件名(或路径)。
我在一些地方使用了readinput_file_name()应该提供这些信息(尽管需要注意的是,这只在调用from_catalog而不是from_options时有效,我相信我是这样做的!)).
因此,下面的代码看起来应该可以工作,但总是为每个input_file_name返回空值。

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session

job = Job(gc)
job.init(args['JOB_NAME'], args)

# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
    database='database_name',
    table_name='table_name',
    transformation_ctx='fm_source',
)

df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)

结果输出:

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

是否有其他方法可以创建框架来确保input_file_name()被填充?现在,我尝试通过create_dynamic_frame.from_catalogcreate_dynamic_frame.from_optionsgetSource().getFrame()构建源帧,但得到的结果是相同的,每个都是空的input_file_name列。

k97glaaz

k97glaaz1#

我还添加了我的经验,在我的情况下,我收到了一个空的结果,因为调用cache()方法。
例如:

import pyspark.sql.functions as F

df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())

df.show()

我收到

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

但是如果删除df.cache()行,input_file_name列就会正确地显示输入文件名。
解决方法可能是在缓存之前调用F.input_file_name()

o2rvlv0m

o2rvlv0m2#

我相信这在使用groupFiles选项时是不可能的,因为Glue在幕后连接文件以创建最佳的输入数量。因此,input_file_name的概念在此上下文中没有意义,因为原始文件路径不再是直接输入。
然而,文档在某种意义上有点误导,即使输入少于50,000个文件,如果不显式禁用该选项,Glue也会根据文件大小将输入连接起来。在我们的例子中,我们有成千上万的小输入文件(〈1 MB)导致这种行为。
您可以通过显式禁用分组来轻松验证这一点(注意,对于类似于我们的场景,这将对性能产生严重影响:

ds_s3 = self.gc.getSource(
    connection_type='s3',
    paths=paths,
    groupFiles='none',
)
fm_s3 = ds_s3.getFrame()

当然,最好不要依赖于输入状态或上下文,因此我们最终编写了一个在S3 PUT上触发的AWS Lambda,它将元数据(包括文件名和路径)写入文件本身。

c0vxltue

c0vxltue3#

我发现了另一个奇怪的行为。当我在input_file_path之前添加limit()时,id也不起作用。
此代码的文件名为空

df_raw = (spark
          .read
          .option("delimiter", ";")
          .csv(filepath,header = "true",inferSchema=True)      
          .select("COL1","COL2")
          .limit(1000)
          )

df_transform = (df_raw
          .withColumn("filename", f.input_file_name())
          )

这个代码有效

df_raw = (spark
          .read
          .option("delimiter", ";")
          .csv(filepath,header = "true",inferSchema=True)      
          .select("COL1","COL2")
          )

df_transform = (df_raw
          .withColumn("filename", f.input_file_name())
          .limit(1000)
          )

我花了一些时间才弄清楚这一点,因为我试图通过只阅读几行来加快调试速度。

ztyzrc3y

ztyzrc3y4#

我遇到了和Vzzarr一样的问题。当我在dataframe上调用cache后创建input_file_path列时,文件路径为空。但是当我在调用cache之前创建input_file_path时,它工作了。

omjgkv6w

omjgkv6w5#

如果上面的方法都不行,再加上这个。.

spark.catalog.clearCache()

以清除群集中的所有缓存。

t5fffqht

t5fffqht6#

有时,有一些边缘情况,其中这个方法,'input_file_name()',在DynamicFrame中不能像预期的那样工作。在这种情况下,您可以尝试对DynamicFrame使用“attachFilename”选项来检索记录的文件路径。
下面是一个例子:
==验证码:

dyf6_ = glue_context.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://awsglue-datasets/examples/highly-partitioned-table/year=2021/month=12/"],
        "recurse":True,
        "groupFiles": "inPartition",
        "groupSize": "1048576"
    },
    format="json",
    format_options={
        "attachFilename": "filenameCol"
    }
)
dyf6_.toDF().show()
uoifb46i

uoifb46i7#

这个问题很重要,因为有些人可能会在生产环境中使用此功能。我们在MediaQX的数据团队中做了一些调查,以下是我们的发现。感谢@Leyla Helin Çetin和@Eren Sakarya。
在研究结果之前,请确保您没有在Glue上使用此input_file_name功能,因为当文件大小很小时,它会中断。请确保将必要的列添加到文件中,并且不依赖于路径。如果你使用的是纯Spark,你可能不需要担心这一点。
我们检查了所有相关帖子的回复,我将链接它们,为即将到来的问题持有人提供循环阅读:
在这种情况下,这是最合理的解决方案,但在glue上下文中,尚不清楚它是否完美,您可能会错过一些glue功能,如书签:如何在AWS Glue作业中追加一个包含源名称的新列?
有几种方法显示了Map函数方法,但它们都使用input_file_name函数,因此对于生产解决方案来说并不准确。在AWS Glue中使用S3文件夹结构作为 meta数据
有些人通过使用作业名称解决了他们的问题,但对于不同的配置,这是不合理的:AWS Glue: How to add a column with the source filename in the output?
在这个线程中,讨论了groupFiles选项,在AWS文档中,它声明它不适用于parquet文件,但实际上它可以工作,并且它不是这个问题的直接解决方案。
另一种解决方案是删除文件名中的点,但在我们的情况下,这不是一个解决方案:Glue dynamic frame is not populating from s3 bucket
您可以通过设置以下参数使函数工作:spark.conf.set("spark.sql.files.maxPartitionBytes", "1MB")
但是,我们认为它增加了阅读时间,并且使input_file_name功能正常工作。我们认为,当阅读过程非常快时,该功能停止。请注意,读取文件的总大小应该大于您设置的参数,因此它也不是一个完美的解决方案。除此之外,它还会降低作业的速度,因为该参数的默认值是128 MB。
我希望这个答案能有所帮助,请大声说出您的发现,以便AWS工作人员在修复此错误时可能会使用此答案。
干杯。

vof42yt1

vof42yt18#

这发生在我身上是因为我的数据块集群有一个不推荐的运行时(7.6)我试过7。3、工作中的LTS:)

相关问题