sparkDataframe读写

bnl4lu3b  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(396)

我有一个用例,需要将数百万个json格式的数据加载到apachehive表中。所以我的解决方案很简单,将它们加载到dataframe中,并将它们写为Parquet文件。然后我将在它们上面创建一个外部表。
我使用的是ApacheSpark2.1.0和Scala2.11.8。
碰巧所有的消息都遵循一种灵活的模式。例如,“amount”列的值可以是-1.0或1。
由于我正在将数据从半结构化格式转换为结构化格式,但我的模式稍有变化,因此我认为用于json等数据源的inferschema选项将对我有所帮助。

spark.read.option("inferSchema","true").json(RDD[String])

当我在读取json数据时使用inferschema作为true时,
案例1:对于较小的数据,所有Parquet文件的数量都是原来的两倍。
案例2:对于较大的数据,一些Parquet文件的amount为double,而其他文件的amount为int64。
我试着调试,发现了一些概念,比如模式演化和模式合并,这些概念在我的脑海中浮现,让我疑惑多于答案。
我的疑问是
当我尝试推断模式时,它不将推断的模式强制到完整的数据集上吗?
由于我的限制,我不能强制任何模式,所以我想将整个列转换为double数据类型,因为它可以同时包含整数和十进制数。有更简单的方法吗?
我的猜测是,由于数据是分区的,因此inferschema在每个分区上都起作用,然后它给了我一个通用的schema,但它没有执行schema之类的操作。如果我错了,请纠正我。
注意:我之所以使用inferschema选项,是因为传入的数据太灵活/变量,无法强制我自己的case类,尽管有些列是必需的。如果你有一个更简单的解决方案,请建议。

2ekbmq32

2ekbmq321#

推断模式实际上只是处理所有的行以找到类型。一旦这样做了,它就会合并结果以找到整个数据集的公共模式。
例如,某些字段可能在某些行中有值,但在其他行中没有值。所以这个字段的推断模式就可以为空。
为了回答您的问题,可以为您的输入推断模式。但是,由于您打算在配置单元中使用输出,因此应该确保所有输出文件具有相同的模式。
一个简单的方法是使用铸造(正如你所建议的)。我通常喜欢在工作的最后阶段进行选择,只列出所有的列和类型。我觉得这让这份工作更具可读性。
例如

df
.coalesce(numOutputFiles)
.select(
  $"col1"        .cast(IntegerType).as("col1"),
  $"col2"        .cast( StringType).as("col2"),
  $"someOtherCol".cast(IntegerType).as("col3")
)
.write.parquet(outPath)

相关问题