pyspark 将具有不同标头的多个CSV读入一个 Dataframe

6yt4nkrj  于 2022-11-28  发布在  Spark
关注(0)|答案(1)|浏览(207)

我有几个CSV文件,其中一些文件可能有一些匹配的列,一些有完全不同的列。例如文件1有以下列:
['电路ID','电路参考','名称','位置','国家/地区','纬度','经度','替代','网址']
file2具有以下列:
['赛道ID','年份','轮次','赛道ID','名称','日期','时间','网址']
我想创建一个包含所有这些列的 Dataframe 。我写了下面的代码,希望预定义的模式和CSV文件头之间的Map会自动发生,但没有成功。

sch=StructType([StructField('circuitId',StringType(),True),
StructField('year',StringType(),True),
StructField('name',StringType(),True),
StructField('alt',StringType(),True),
StructField('url',StringType(),True),
StructField('round',StringType(),True),
StructField('lng',StringType(),True),
StructField('date',StringType(),True),
StructField('circuitRef',StringType(),True),
StructField('raceId',StringType(),True),
StructField('lat',StringType(),True),
StructField('location',StringType(),True),
StructField('country',StringType(),True),
StructField('time',StringType(),True)
])

df=spark.read \
        .option('header','true') \
        .schema(sch) \
        .csv('/FileStore/Udemy/Formula_One_Raw/*.csv')

使用这段代码,我将得到以下输出:

gfttwv5a

gfttwv5a1#

CSV使用列的位置(而不是名称)来解析架构,因此,如果您有一个包含8列的文件,例如:

['raceId', 'year', 'round', 'circuitId', 'name', 'date', 'time', 'url']

您尝试应用的架构的前8个字段为:

StructField('circuitId',StringType(),True),
StructField('year',StringType(),True),
StructField('name',StringType(),True),
StructField('alt',StringType(),True),
StructField('url',StringType(),True),
StructField('round',StringType(),True),
StructField('lng',StringType(),True),
StructField('date',StringType(),True)

raceId将被推断为circuitIdround将被推断为name,依此类推。您可以执行以下操作来解决此问题:

  • 为每个不同的文件创建单独的架构,考虑其中的列,并在最后添加不属于此特定架构的列-这样,它们就应该包含在DataFrame中,并使用NULL填充。

因此,对于顶部的示例CSV模式,您可以如下声明模式:

StructField('raceId',StringType(),True),
StructField('year',StringType(),True),
StructField('round',StringType(),True),
StructField('circuitId',StringType(),True),
StructField('name',StringType(),True),
StructField('date',StringType(),True),
StructField('time',StringType(),True),
StructField('url',StringType(),True),
StructField('alt',StringType(),True),
StructField('lng',StringType(),True),
StructField('circuitRef',StringType(),True),
StructField('lat',StringType(),True),
StructField('location',StringType(),True),
StructField('country',StringType(),True)

您的DataFrame应该如下所示:

+------+----+-----+---------+--------------------+----------+--------+--------------------+----+----+----------+----+--------+-------+
|raceId|year|round|circuitId|                name|      date|    time|                 url| alt| lng|circuitRef| lat|location|country|
+------+----+-----+---------+--------------------+----------+--------+--------------------+----+----+----------+----+--------+-------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|06:00:00|http://en.wikiped...|null|null|      null|null|    null|   null|
+------+----+-----+---------+--------------------+----------+--------+--------------------+----+----+----------+----+--------+-------+

在将所有数据读取到DataFrame之后,如果需要,您将能够使用列名来联合它们。

相关问题