pyspark:将多个文件加载到rdd中,但保留文件名

cgvd09ve  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(377)

这个问题在这里已经有了答案

spark加载数据并将文件名添加为dataframe列(1个答案)
7个月前关门了。
我有一些csv文件的格式如下:

JO.csv
KE.csv
NZ.csv
CO.csv
MY.csv
IN.csv
PL.csv
NO.csv
ZA.csv
DK.csv
IL.csv
CN.csv
BE.csv
AR.csv
IT.csv
JP.csv

我正在尝试将这些都加载到sparkrdd中,并保留原产国(一个剥离的文件名就足够了)
从pyspark.sql导入sqlcontext sqlcontext=sqlcontext(sc)

a = sc.textFile("./*.csv")

display_rdd_function(a)

运行此命令将使sparkrdd中的所有行都没有键。
我想把每一个项目作为[strip(filename),csv\u row](或大约)。
关于添加多个文件还有许多其他问题,但是没有人考虑使用键添加多个文件。
谢谢您

yzuktlbb

yzuktlbb1#

我建议用 sqlCOntext 阅读 csv directory . 我在本地试过这个-

目录结构

csvs
    |- JO.csv
       ------
       1
       2
    |- KE.csv
       ------
       3
       4

阅读目录如下-

val csv_dir_relative_path = "csvs"
    val dataPath = getClass.getResource("/" + csv_dir_relative_path).getPath
    val inputDF = sqlContext.read
      .schema(StructType(Array(StructField("id", DataTypes.IntegerType))))
      .csv(dataPath)
    inputDF.show(false)

您可以观察到在单个Dataframe中按顺序读取所有文件-

+---+
|id |
+---+
|1  |
|2  |
|3  |
|4  |
+---+

现在你可以添加 file_name 像列一样-

val withFileName = inputDF.withColumn("file_name", functions.input_file_name())
    withFileName.show(false)

下面是 show 结果,请注意,我已经替换了 full path<path> ```
+---+----------------------------------------------------------------------------------------------------------------------+
|id |file_name |
+---+----------------------------------------------------------------------------------------------------------------------+
|1 |file:////resources/csvs/JO.csv|
|2 |file:////resources/csvs/JO.csv|
|3 |file:///<>/resources/csvs/KE.csv|
|4 |file:///<>/resources/csvs/KE.csv|
+---+----------------------------------------------------------------------------------------------------------------------+

类似的代码将在spark的pythonapi中工作-

相关问题