2个执行者做“所有”的工作

9avjhtql  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(299)

在过去的一天里,我一直在想这个问题,但没有成功。

我面临的问题

我在看一个大概2gb大的Parquet文件。最初的读取是14个分区,然后最终分成200个分区。我执行看似简单的sql查询,运行时间为25分钟以上,单个阶段大约需要22分钟。在spark ui中,我看到所有的计算最终都会被推到大约2到4个执行器上,并进行大量的洗牌。我不知道发生了什么事。请允许我帮忙。

设置

spark环境-databricks
群集模式-标准
databricks运行时版本-6.4 ml(包括apache spark 2.4.5、scala 2.11)
云-蔚蓝
工人类型-56 gb,每台机器16核。至少2台机器
驱动程序类型-112 gb,16核

笔记本

单元格1:辅助函数

load_data = function(path, type) {
  input_df = read.df(path, type)

  input_df = withColumn(input_df, "dummy_col", 1L)
  createOrReplaceTempView(input_df, "__current_exp_data")

  ## Helper function to run query, then save as table
  transformation_helper = function(sql_query, destination_table) {
    createOrReplaceTempView(sql(sql_query), destination_table)
  }

  ## Transformation 0: Calculate max date, used for calculations later on
  transformation_helper(
    "SELECT 1L AS dummy_col, MAX(Date) max_date FROM __current_exp_data",
    destination_table = "__max_date"
  )

  ## Transformation 1: Make initial column calculations
  transformation_helper(
    "
    SELECT
        cId                                                          AS cId
      , date_format(Date, 'yyyy-MM-dd')                              AS Date
      , date_format(DateEntered, 'yyyy-MM-dd')                       AS DateEntered
      , eId

      , (CASE WHEN isnan(tSec) OR isnull(tSec) THEN 0 ELSE tSec END) AS tSec
      , (CASE WHEN isnan(eSec) OR isnull(eSec) THEN 0 ELSE eSec END) AS eSec

      , approx_count_distinct(eId) OVER (PARTITION BY cId)           AS dc_eId
      , COUNT(*)                   OVER (PARTITION BY cId, Date)     AS num_rec

      , datediff(Date, DateEntered)                                  AS analysis_day
      , datediff(max_date, DateEntered)                              AS total_avail_days
    FROM __current_exp_data
    CROSS JOIN __max_date ON __main_data.dummy_col = __max_date.dummy_col
  ",
    destination_table = "current_exp_data_raw"
  )

  ## Transformation 2: Drop row if Date is not valid
  transformation_helper(
    "
    SELECT 
        cId
      , Date
      , DateEntered
      , eId
      , tSec
      , eSec

      , analysis_day
      , total_avail_days
      , CASE WHEN analysis_day     == 0 THEN 0    ELSE floor((analysis_day - 1) / 7)   END AS week
      , CASE WHEN total_avail_days  < 7 THEN NULL ELSE floor(total_avail_days / 7) - 1 END AS avail_week
    FROM current_exp_data_raw
    WHERE 
        isnotnull(Date) AND 
        NOT isnan(Date) AND 
        Date >= DateEntered AND
        dc_eId == 1 AND
        num_rec == 1
  ",
    destination_table = "main_data"
  )

  cacheTable("main_data_raw")
  cacheTable("main_data")
}

spark_sql_as_data_table = function(query) {
  data.table(collect(sql(query)))
}

get_distinct_weeks = function() {
  spark_sql_as_data_table("SELECT week FROM current_exp_data GROUP BY week")
}

单元格2:调用helper函数来触发长时间运行的任务

library(data.table)
library(SparkR)

spark = sparkR.session(sparkConfig = list())

load_data_pq("/mnt/public-dir/file_0000000.parquet")

set.seed(1234)

get_distinct_weeks()

长运行阶段

长跑阶段统计

日志

我删减了它,只显示下面多次出现的条目

BlockManager: Found block rdd_22_113 locally

CoarseGrainedExecutorBackend: Got assigned task 812

ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

InMemoryTableScanExec: Predicate (dc_eId#61L = 1) generates partition filter: ((dc_eId.lowerBound#622L <= 1) && (1 <= dc_eId.upperBound#621L))

InMemoryTableScanExec: Predicate (num_rec#62L = 1) generates partition filter: ((num_rec.lowerBound#627L <= 1) && (1 <= num_rec.upperBound#626L))

InMemoryTableScanExec: Predicate isnotnull(Date#57) generates partition filter: ((Date.count#599 - Date.nullCount#598) > 0)

InMemoryTableScanExec: Predicate isnotnull(DateEntered#58) generates partition filter: ((DateEntered.count#604 - DateEntered.nullCount#603) > 0)

MemoryStore: Block rdd_17_104 stored as values in memory (estimated size <VERY SMALL NUMBER &lt; 10> MB, free 10.0 GB)

ShuffleBlockFetcherIterator: Getting 200 non-empty blocks including 176 local blocks and 24 remote blocks

ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms

UnsafeExternalSorter: Thread 254 spilling sort data of <Between 1 and 3 GB> to disk (3  times so far)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题