我在hdfs上有一个csv格式的输入文件,包含以下列: date, time, public_ip
使用它,我需要从相当大的表(每天约1亿行)中过滤出数据。该表的结构(大致如下):
CREATE TABLE big_table (
`user_id` int,
`ip` string,
`timestamp_from` timestamp,
`timestamp_to` timestamp)
PARTITIONED BY (`PARTITION_DATE` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat';
我需要读取csv数据,然后过滤 big_table
正在检查哪些用户ID在所选期间使用了所需的ip地址。
我尝试使用sparksql与不同的连接,但运气不太好。无论我做什么,spark都不够“聪明”,无法限制每个分区的大表。我也试过用 WHERE PARTITION_DATE IN (SELECT DISTINCT date FROM csv_file
,但这也相当缓慢。
csv应该有20天左右的时间。以下是我的解决方案-我最终选择了不同的日期并将其用作字符串:
spark.sql("select date from csv_file group by date").createOrReplaceTempView("csv_file_uniq_date")
val partitions=spark.sql("select * from csv_file_uniq_date").collect.mkString(sep=",").replaceAll("[\\[\\]]","")
spark.sql("select user_id, timestamp_from, timestamp_to from big_table where partition_date in (" + partitions + ") group by user_id, timestamp_from, timestamp_to").write.csv("output.csv")
现在,这样做的工作-我削减了任务从10万到数千,但我觉得很不满意的实施。有人能给我指一下正确的方向吗?如何避免将其作为逗号分隔的分区值字符串拉取?
使用spark 2.2
干杯!
1条答案
按热度按时间gkn4icbw1#
你所期待的就是
Dynamic Partition Pruning
,从而spark将足够聪明地解析要从直接连接条件中过滤的分区。此功能可从
Spark 3.0
作为自适应查询执行改进的一部分。从该链接查找更多详细信息
默认禁用,可通过设置
spark.sql.adaptive.enabled=true