我有一亿条记录的表格数据,每条记录有15列。我需要查询此数据的3列,并筛选出记录,以便在进一步处理中使用。
目前我在两种方法之间做决定
方法1将数据存储为csv或Parquet在hdfs中。当我需要查询时,读取整个数据并使用sparksql进行查询。
方法2使用hivecontext创建配置单元表,并持久化表和配置单元元数据。需要时使用hivecontext查询此表。
怀疑:
在方法2中,是否将查询推送到数据库级(hdfs)并且只读取和返回满足条件的记录?或者将整个数据读入内存(就像大多数spark作业一样),然后使用元数据运行查询?
运行时:在这两种方法中,哪一种更快?
请注意,配置单元设置不是配置单元over spark,而是spark提供的hivecontext。
spark版本:2.2.0
2条答案
按热度按时间dphi5xsq1#
在方法2中,应该以适当的方式构造和存储配置单元表。如果配置单元表被分区并以支持索引的文件格式存储(如orc),则spark不会加载所有数据。spark优化引擎将使用分区修剪和 predicate 下推,并只加载相关数据进行进一步处理(转换/操作)。
分区修剪:选择适当的列(在分区间均匀分布数据)对配置单元表进行分区。spark分区修剪与hive元存储一起有效地工作。它将根据查询的where子句中使用的partition\列只查找相关的分区。
predicate 下推:orc文件具有最小/最大索引和bloom过滤器。将工作的字符串列也在兽人(不知道最新的Parquet字符串支持),但更有效的数字列。当spark将筛选器下推到底层存储(orc文件)时,它将只读与筛选器匹配的行。下面是一个示例spark片段,用于创建这样的配置单元表(假设raw\u df是从原始数据创建的Dataframe)
sorted_df = raw_df .sort("column2")
sorted_df.write.mode("append").format("orc").partitionBy("column1").saveAsTable("hive_table_name")
这将根据column1值对数据进行分区,并在hdfs中保存orc文件并更新hive metastore。使用column2对表进行排序,假设我们将在查询where子句中使用column2然后您可以查询配置单元并用相关数据加载spark dataframe。下面是示例。
filtered_df = spark.sql('SELECT column1,column2,column3 FROM hive_table_name WHERE column1= "some_value1" AND column2= "some_value2"')
在上面的示例中,spark将只查看某个分区,因为column1是配置单元表中创建的分区列。然后spark会将orc文件中的column2的 predicate (即filter)“some\u value2”推送到“some\u value1”分区下。这里spark将只加载column1、column2、column3的值,甚至忽略表中的其他列。8qgya5xd2#
除非将第二种方法与更高级的存储布局结合起来(
bucketBy
/DISTRIBUTE BY
)它可以用来优化查询只要在方法1中不使用模式推理,这两者之间应该没有区别(必须为DataFrameReader
).bucketing可以用来优化bucketing列上的连接、聚合和过滤器的执行计划,但仍然使用spark执行所有操作。一般来说,spark只将配置单元用作元存储,而不是执行引擎。