我们有一个基于hadoop的解决方案(cdh5.15),可以在hdfs中的一些目录中获取新文件。在这些目录之上,我们有4-5个impala(2.1)表。在hdfs中写入这些文件的过程是spark结构化流(2.3.1)
现在,我们正在运行一些ddl查询,只要我们将文件写入hdfs: ALTER TABLE table1 RECOVER PARTITONS
检测添加到表中的新分区(及其hdfs目录和文件)。 REFRESH table1 PARTITIONS (partition1=X, partition2=Y)
,使用每个分区的所有键。
现在,这个ddl占用的时间有点太长,它们在我们的系统中排队,破坏了系统的数据可用性。
所以,我的问题是:有没有一种方法可以更有效地进行数据合并?
我们考虑过:
使用 ALTER TABLE .. RECOVER PARTITONS
但根据文档,它只刷新新分区。
试图使用 REFRESH .. PARTITON ...
但语句syntaxis不允许这样做。
已尝试批处理查询,但配置单元jdbc驱动器不支持批处理查询。
考虑到系统已经很忙,我们是否应该尝试同时进行这些更新?
你还知道其他的方法吗?
谢谢!
胜利者
注意:我们知道哪些分区需要刷新的方法是使用hdfs事件,就像我们不知道的spark结构化流一样´我不知道文件是什么时候写的。
注意#2:另外,用hdfs编写的文件有时很小,因此如果可以同时合并这些文件就更好了。
1条答案
按热度按时间hfyxw5xn1#
由于似乎没有人知道我的问题的答案,我想与大家分享我们采取的方法,使这个处理更有效,评论是非常欢迎的。
我们发现了(医生。不太清楚)hdfs中spark“checkpoints”中存储的一些信息是一些元数据文件,描述了每个Parquet地板文件的写入时间和大小:
所以,我们做的是:
构建一个spark流作业
_spark_metadata
文件夹。我们使用
fileStream
因为它允许我们定义要使用的文件过滤器。该流中的每个条目都是这些json行中的一行,通过解析来提取文件路径和大小。
按文件所属的父文件夹(Map到每个impala分区)对文件进行分组。
对于每个文件夹:
读取仅加载目标Parquet文件的Dataframe(以避免与写入文件的其他作业发生竞争)
计算要写入的块数(使用json中的size字段和目标块大小)
将Dataframe合并到所需的分区数并将其写回hdfs
执行ddl
REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
最后,删除源文件我们取得的成就是:
通过对每个分区和批执行一次刷新来限制DDL。
通过配置批处理时间和块大小,我们能够使产品适应具有较大或较小数据集的不同部署场景。
这个解决方案非常灵活,因为我们可以为spark流作业分配更多或更少的资源(执行器、内核、内存等),也可以启动/停止它(使用它自己的检查点系统)。
我们还研究了在执行此过程时应用一些数据重新分区的可能性,以使分区尽可能接近最佳大小。