我们正在使用spark 2.2.0。我们在一个配置单元表中有1.5 tb的数据。我们有80个节点的集群,每个节点有大约512GB的ram和40个内核。
我正在使用sparksql访问这些数据。使用纯sparksql(不带缓存)的简单命令(比如获取特定列值的不同计数)大约需要13秒。但是当我在缓存表之后运行相同的命令时,它需要10分钟以上的时间。不确定是什么问题?
export SPARK_MAJOR_VERSION=2
spark-shell --master yarn --num-executors 40 --driver-memory 5g --executor-memory 100g --executor-cores 5
spark.conf.set("spark.sql.shuffle.partitions", 10)
val df = spark.sql("select * from analyticalprofiles.customer_v2")
df.createOrReplaceTempView("tmp")
spark.time(spark.sql("select count(distinct(household_number)) from tmp").show())
>> Time taken: 13927 ms
import org.apache.spark.storage.StorageLevel
val df2 = df.persist(StorageLevel.MEMORY_ONLY)
df2.createOrReplaceTempView("tmp2")
spark.time(spark.sql("select count(distinct(household_number)) from tmp2").show())
>> 1037482 ms ==> FIRST TIME - okay if this is more
spark.time(spark.sql("select count(distinct(household_number)) from tmp2").show())
>> 834740 ms ==> SECOND TIME - Was expecting much faster execution ???
尝试了与“spark.catalog.cachetable(“tmp”)相同的方法,但仍然使用缓存查询需要更多的时间。不知道为什么???有人能帮忙吗???
df2.storageLevel.useMemory
res6: Boolean = true
sc.getPersistentRDDs
res8: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(12 -> In-memory table tmp MapPartitionsRDD[12] at cacheTable at <console>:24)
spark.conf.get("spark.sql.inMemoryColumnarStorage.compressed")
res11: String = true
spark.conf.get("spark.sql.inMemoryColumnarStorage.batchSize")
res12: String = 10000
spark.catalog.isCached("tmp")
res13: Boolean = true
1条答案
按热度按时间sczxawaw1#
你可以尝试以下方法。
您可以使用以下公式增加执行器的数量并减少执行器内存
如果要持久化Dataframe,请使用storagelevel.memory\和磁盘\ ser。如果内存(ram)已满,它将保存在磁盘中。
希望对你有帮助。