pyspark 在内存中重新分区与文件

yh2wf1be  于 2023-08-02  发布在  Spark
关注(0)|答案(2)|浏览(134)

repartition()在内存中创建分区并用作read()操作。partitionBy()在磁盘中创建分区并用作写操作。
1.如何在使用repartition()时确认内存中有多个文件
1.如果重新分区只在内存中创建分区articles.repartition(1).write.saveAsTable("articles_table", format = 'orc', mode = 'overwrite'),为什么这个操作只创建一个文件?这与partitionBy()有什么不同?

nr7wwzry

nr7wwzry1#

partitionBy确实会影响文件在磁盘上的外观,并且在写入文件时确实会使用它(它是DataFrameWriter类的一个方法)。
但是,这并不意味着repartition对写入磁盘的内容完全没有影响。
让我们来看看下面的例子:

df = spark.createDataFrame([
  (1,2,3),
  (2,2,3),
  (3,20,300),
  (1,24,299),
  (5,26,312),
  (5,28,322),
  (5,9,2)
], ["colA", "colB", "colC"])

df.write.partitionBy("colA").parquet("using_partitionBy.parquet")
df.repartition(4).write.parquet("using_repartition.parquet")

字符串
在这里,我们创建了一个简单的dataframe,并使用两种方法将其写入:

1)使用partitionBy

磁盘上的输出文件结构如下所示:

tree using_partitionBy.parquet/
using_partitionBy.parquet/
├── colA=1
│   ├── part-00000-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
│   └── part-00002-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=2
│   └── part-00001-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=3
│   └── part-00001-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
├── colA=5
│   ├── part-00002-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
│   └── part-00003-17e47d08-2bbe-4ad6-809e-c466b396de4e.c000.snappy.parquet
└── _SUCCESS


我们看到这创建了6个“子文件”,在4个“子目录”中。关于数据值的信息(如colA=1)实际上存储在磁盘上。这使您能够在需要读取此文件的后续查询中进行重大改进。假设你需要读取colA=1的所有值,这将是一个微不足道的任务(忽略其他子目录)。

2)使用repartition(4)

磁盘上的输出文件结构如下所示:

tree using_repartition.parquet/
using_repartition.parquet/
├── part-00000-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00001-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00002-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
├── part-00003-6dafa06d-cea6-4af0-ba0d-9c761de0fd50-c000.snappy.parquet
└── _SUCCESS


我们看到创建了4个“子文件”,没有创建“子目录”。实际上,这些“子文件”代表Spark内部的分区。由于您通常在Spark中处理非常大的数据,因此所有数据都必须以某种方式进行分区。
每个分区将由1个任务处理,该任务可以由集群的1个核心占用。一旦这个任务被一个核心承担,在完成所有必要的处理之后,你的核心将把这个输出写到磁盘上的一个“子文件”中。当它完成写这个“子文件”时,您的核心就准备好读取另一个分区了。

partitionByrepartition什么时候使用

这是一个有点固执己见,当然不是详尽无遗的,但可能会给予你一些洞察力,什么使用。
partitionByrepartition可用于不同的目标:

  • 在以下情况下使用partitionBy
  • 您希望在磁盘上写入数据,并希望在磁盘上读取数据具有很大的性能优势。当你有一个列,你将做大量的过滤,其基数不是太高,这将是非常有用的
  • 在以下情况下使用repartition
  • 您希望根据集群大小调整分区大小,以提高作业的性能
  • 您希望写出一个分区大小合理的文件,但是在任何列上使用partitionBy都会导致基数过高(想象一下传感器上的时间序列数据)。
68bkxrlz

68bkxrlz2#

1.你的意思是如何确认,当你这样做,例如.repartition(100),你会得到100个文件的输出?我在SparkUI中检查了一下,任务数=分区数=写入文件数
1.使用.repartition(1),您将整个数据集移动到一个分区,该分区将由一个核心作为一个任务处理并写入一个文件。在paralell中无法处理单个任务,因此Spark别无选择,只能将所有内容存储在一个文件中

相关问题