package examples
import org.apache.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
object PartitionByExample extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
val spark = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate
import spark.implicits._
//create a dataframe with demo data
val df = spark.sparkContext.parallelize(Seq(
(1, "Fname1", "Lname1", "Belarus"),
(2, "Fname1", "Lname1", "Belarus"),
(3, "Fname2", "Lname2", "Belgium"),
(3, "Fname2", "Lname2", "Belgium"),
(4, "Fname3", "Lname3", "Austria"),
(5, "Fname4", "Lname4", "Australia"),
(6, "Fname4", "Lname4", "Australia")
)).toDF("id", "fname", "lname", "country")
//save the data with partitionby first letter of country
df.write.mode("overwrite").partitionBy("country").format("com.databricks.spark.csv").save("outputpath")
import org.apache.spark.HashPartitioner
// this is paired rdd
val rddOneP: RDD[(String, String)] = df.rdd.map {
x => (x.getAs[String]("id"), x.getAs[String]("country"))
}
rddOneP.partitionBy(new HashPartitioner(6)).saveAsTextFile("outputpath1")
println(" partition By here .. .Return a copy of the RDD partitioned using the specified partitioner.")
}
1条答案
按热度按时间laximzn51#
spark中的分区是存储在集群节点上的数据块(数据的逻辑划分)。分区是apachespark中的基本并行单元。
partitionBy()
是一个DataFrameWriter
方法,指定是否应将数据写入文件夹中的磁盘。进一步读取-使用partitionby在磁盘上进行分区
更新:
考虑一下这个例子
案例1:
它的dataframe partitionby可以将相同的国家/地区信息写入一个csv文件
案例2
和你的风格搭配…:它将根据它试图存储/保存的元素的hashcode创建6个分区(因为我保存为文本文件)
如果可能的话,试着执行上面的话,你会理解你自己的。一个是数据框另一个是对等的。。。第二个和第二个一样好
repartition(6)
因为重分区使用hashpartitioner。如果你移除
并添加
到上面的程序,你会得到同样的输出。。。