在ApacheSpark中有一个INPUT_FILE_NAME函数,我用它来向数据集中添加新的列,其名称为当前正在处理的文件。
问题是,我想以某种方式定制这个函数,使其只返回文件名,而忽略S3上它的完整路径。
目前,我正在使用map函数替换第二步中的路径:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
但是我想用一些像这样的东西
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name",**customized_input_file_name_function**)
3条答案
按热度按时间v09wglhw1#
在Scala中:
编辑:在Java中根据评论
gzszwxb42#
借用相关问题here,下面的方法更可移植,不需要自定义UDF。
Spark SQL代码片段:
reverse(split(path, '/'))[0]
Spark SQL示例:
解释:
split()
函数将路径分成块,reverse()
将最后一项(文件名)放在数组前面,以便[0]
可以只提取文件名。以下是完整的代码示例:
结果:
nmpmafwu3#
用法是这样的..。