scala 仅从Spark SQL中的PATH中提取文件名的UDF

nbnkbykc  于 2022-11-09  发布在  Scala
关注(0)|答案(3)|浏览(181)

在ApacheSpark中有一个INPUT_FILE_NAME函数,我用它来向数据集中添加新的列,其名称为当前正在处理的文件。
问题是,我想以某种方式定制这个函数,使其只返回文件名,而忽略S3上它的完整路径。
目前,我正在使用map函数替换第二步中的路径:

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
  val baseName: String = FilenameUtils.getBaseName(fileName)
  val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
  this.valueOf(tmpFileName)
}

但是我想用一些像这样的东西

val initialDs = spark.sqlContext.read
    .option("dateFormat", conf.dateFormat)
    .schema(conf.schema)
    .csv(conf.path).withColumn("input_file_name",**customized_input_file_name_function**)
v09wglhw

v09wglhw1#

在Scala中:


# register udf

spark.udf
  .register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)

# use the udf to get last token(filename) in full path

val initialDs = spark.read
  .option("dateFormat", conf.dateFormat)
  .schema(conf.schema)
  .csv(conf.path)
  .withColumn("input_file_name", get_only_file_name(input_file_name))

编辑:在Java中根据评论


# register udf

spark.udf()
  .register("get_only_file_name", (String fullPath) -> {
     int lastIndex = fullPath.lastIndexOf("/");
     return fullPath.substring(lastIndex, fullPath.length - 1);
    }, DataTypes.StringType);

import org.apache.spark.sql.functions.input_file_name    

# use the udf to get last token(filename) in full path

Dataset<Row> initialDs = spark.read()
  .option("dateFormat", conf.dateFormat)
  .schema(conf.schema)
  .csv(conf.path)
  .withColumn("input_file_name", get_only_file_name(input_file_name()));
gzszwxb4

gzszwxb42#

借用相关问题here,下面的方法更可移植,不需要自定义UDF。

Spark SQL代码片段:reverse(split(path, '/'))[0]
Spark SQL示例:

WITH sample_data as (
SELECT 'path/to/my/filename.txt' AS full_path
)
SELECT
      full_path
    , reverse(split(full_path, '/'))[0] as basename
FROM sample_data

解释:split()函数将路径分成块,reverse()将最后一项(文件名)放在数组前面,以便[0]可以只提取文件名。

以下是完整的代码示例:

spark.sql(
    """
      |WITH sample_data as (
      |    SELECT 'path/to/my/filename.txt' AS full_path
      |  )
      |  SELECT
      |  full_path
      |  , reverse(split(full_path, '/'))[0] as basename
      |  FROM sample_data
      |""".stripMargin).show(false)

结果:

+-----------------------+------------+
|full_path              |basename    |
+-----------------------+------------+
|path/to/my/filename.txt|filename.txt|
+-----------------------+------------+
nmpmafwu

nmpmafwu3#

  • Commons io*是Spark Means中的自然/最简单的导入(不需要添加额外的依赖...)
import org.apache.commons.io.FilenameUtils
getBaseName(String fileName)
  • 从完整文件名中获取基本名称,减去完整路径和扩展名。*
val baseNameOfFile = udf((longFilePath: String) => FilenameUtils.getBaseName(longFilePath))

用法是这样的..。

yourdataframe.withColumn("shortpath" ,baseNameOfFile(yourdataframe("input_file_name")))
.show(1000,false)

相关问题