使用apachespark,我创建了一些关于法国城市的会计数据。
这个 Dataset
具有以下主要字段:
city_code, establishment_id, account_number, amount, city name, department
29045 , 2904521051 , 6105 , 23.51 , Dirinon , 29
29046 , 2907425498 , 4031 , 17.20 , Douarnenez, 29
它是按 orderBy("city_code", "establishment_id", "account_number")
然后被一个 repartition(col("department"))
他们在法国有100多个部门: 01
至 95
,以及一些特殊情况,如 2A
, 2B
,和 971
, 972
, 973
, 974
, 976
. 部门是由三个字符组成的字符串。
保存在 Parquet
文件夹。
我看了一下Parquet文件夹,发现里面有200个街区。
我有点惊讶:我不是应该找到大约100个吗?每个部门一个?
然后,我尝试一个查询。我要这个城市的会计数据 29046
. 它是函数中唯一一个返回其所有机构的相关帐户的参数。
我收到他们很好,但我的日志告诉我,我的Parquet文件的所有块都是红色的这样做。我原以为只需要一个:包含分区的那个 29
.
我开始怀疑自己:但我为什么要相信这一点?apachespark怎么知道任何一个城市的代码的形状 29
必须在有代码的分区中搜索 29
?
我不太懂分区,这里。我把一边的键和另一边的分区键搞混了:它们的链接不是那么紧密,我相信。
我在这里写了多少错误,我应该怎么做才能达到我想要的结果?
我已经尝试了麦克在回答中提出的改变。
从 comptes
首先,我用一个Parquet文件执行了这些操作,没有 substr(city_code, 1, 2)
对于分区:
comptes = comptes.orderBy("codeCommune", "siret", "numeroCompte");
comptes = comptes.withColumn("partitionCommune", substring(col("codeCommune"), 1, 2));
comptes = comptes.repartition(col("partitionCommune"));
comptes.write().parquet("myStore");
然后是对 codeCommune
29046
Dataset<Row> comptes = session.read().parquet("myStore");
comptes.where(col("codeCommune").equalTo("29046")).show();
数据集的生成和磁盘写入(200块)需要697秒。
城市代码的查询尝试 29046
读取所有的200个块并取9。
然后,我将编写parquet文件的行与这个行交换,并重新运行parquet文件的生成和查询。现在它正在创建一个带有分区的Parquet文件:
comptes.write().partitionBy("partitionCommune").parquet("myStore");
数据集的生成及其在磁盘上的写入(近500个块)需要875秒。
查询 city_code
29046读取所有500个块,也需要大约9秒。
(编辑:我有一个bug orderBy
在写入Parquet文件之前,transform不是最后一个,我的数据没有被排序,导致文件结尾有20000个块,因为试图将未排序的数据作为分区写入!)
所以,它起作用了。
2条答案
按热度按时间pgky5nke1#
从医生那里,如果你知道的话
df.repartition
基于列,并且不指定分区数,它将生成默认的分区数,即200。所以你有200个街区。作者的行为(
.write.partitionBy
)与Dataframe略有不同。它将创建的分区(也称为块/文件)的数量等于您指定的分区列中的不同值的数量(N
). 如果不指定分区,默认行为是将Dataframe的每个分区转储到一个单独的文件中。如果您指定了一个,那么行为将是将Dataframe的每个分区转储到N
单独的文件(不转储空分区)。要实现所需的功能,方法是指定Dataframe和写入程序的分区。例如
请注意,我想你可以按城市代码的前两位和部门进行划分。
这将产生相同数量的文件,前提是文件之间存在1:1的关系
department
以及city_code_first_two
.qcuzuvrc2#
“我希望spark在特定分区中搜索密钥,但事实并非如此”
spark不跟踪哪些键将被分配到哪个分区。这意味着,如果在用于分区的列中筛选某个特定值,它仍然必须遍历所有分区。
按列对数据集进行分区只会确保列中的相同值将被分配到同一分区,这可以在处理数据时提高性能,因为每个分区都是在单个执行器上进行窄转换的。
请记住,spark是一个分布式计算引擎,而不是一个可转位的数据存储。
如另一个答案中所述,您可以在存储数据时按一列或多列对数据进行分区。此外,您还可以存储数据。
顺便说一句,如果您发现200太多,您可以随时指定分区的数量