使用spark将csv.gz文件转换为Parquet地板

w46czmvw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(509)

我需要实现转换csv.gz文件在一个文件夹中,在awss3和hdfs,以Parquet文件使用Spark(scala首选)。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:
'年-月-日hh:mm:ss'
我想要的结果是,对于每一天,都有一个文件夹(或分区),在那里特定日期的Parquet文件位于其中。因此将有7个输出文件夹或分区。
我只有一个模糊的想法如何做到这一点,只有sc.textfile是在我的脑海中。spark中是否有一个函数可以转换为parquet?如何在s3和hdfs中实现这一点?
谢谢你的帮助。

brtdzjyr

brtdzjyr1#

老主题,但我认为即使是老主题,如果回答不正确也很重要。
在spark版本>=2中,在您需要将databricks csv包导入作业之前,已经包含csv包,例如“--packages com.d”atabricks:spark-csv_2.10:1.5.0".
csv示例:

id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12 
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10

首先,您需要创建配置单元表,以便spark写入的数据与配置单元模式兼容(在将来的版本中可能不再需要此功能)
创建表:

create table part_parq_table (
    id int,
    name string
    )
partitioned by (date string)
stored as parquet

完成后,您可以轻松地读取csv并将Dataframe保存到该表中。第二步将用类似“yyyy-mm-dd”的日期格式覆盖列日期。对于每个值,将创建一个文件夹,其中包含特定的行。
scala spark外壳示例:

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") 
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

前两行是配置单元配置,创建一个尚不存在的分区文件夹需要配置单元配置。

var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")

插入完成后,您可以直接查询表,如“select*from part\u parq\u table”。文件夹将在默认cloudera上的tablefolder中创建,例如hdfs:///users/hive/warehouse/part\parq\u table
希望对你有帮助

4xrmg8kj

4xrmg8kj2#

如果您查看spark dataframe api和spark csv包,这将实现您要做的大部分工作—将csv文件读入一个dataframe,然后将dataframe作为parquet写出来,这将为您提供大部分方法。
您仍然需要对时间戳进行解析并使用结果对数据进行分区。

c3frrgcw

c3frrgcw3#

通过第二个tsv读取csv文件/user/hduser/wikipedia/pageviews

"timestamp"             "site"  "requests"
"2015-03-16T00:09:55"   "mobile"        1595
"2015-03-16T00:10:39"   "mobile"        1544

下面的代码使用spark2.0

import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")

将字符串时间戳转换为时间戳

wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or 
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests")

写入Parquet文件。

wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")

相关问题