spark窗口配分函数需要很长时间才能完成

3htmauhk  于 2021-05-24  发布在  Spark
关注(0)|答案(3)|浏览(554)

给定一个Dataframe,我试图计算在过去30天内我看到一个emailid的次数。我的函数的主要逻辑如下:

val new_df = df
  .withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))

val winSpec = Window
  .partitionBy("email")
  .orderBy(col("transaction_timestamp"))
  .rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)

val resultDF = new_df
  .filter(col("condition"))
  .withColumn("count", count(col("email")).over(winSpec))

配置:

spark.executor.cores=5

因此,我可以看到5个阶段中有窗口功能,其中一些阶段完成得非常快(几秒钟内),还有2个甚至没有在3小时内完成,被困在最后几个任务中(进展非常缓慢):
这是一个数据倾斜的问题,如果我删除所有包含5个最高频率的行 email 从数据集中,作业很快完成(不到5分钟)。
如果尝试在window partitionby中使用其他键,作业将在几分钟内完成:

Window.partitionBy("email", "date")

但很明显,如果我这样做,它会执行错误的计数计算,这不是一个可接受的解决方案。
我尝试过其他各种Spark设置抛出更多的内存,核心,并行等,而这些似乎都没有帮助。
spark版本:2.2
当前spark配置:
-执行器内存:100g
-执行器核心:5
-驱动器内存:80g
-spark.executor.memory=100克
使用16核128GB内存的机器。最多500个节点。
解决这个问题的正确方法是什么?
更新:为了提供更多上下文,这里是原始Dataframe和相应的计算Dataframe:

val df = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00"),
      ("a@gmail.com", "2019-11-02 01:04:00"), 
      ("a@gmail.com", "2019-11-22 02:04:00"),
      ("a@gmail.com", "2019-11-22 05:04:00"),
      ("a@gmail.com", "2019-12-02 03:04:00"),
      ("a@gmail.com", "2020-01-01 04:04:00"),
      ("a@gmail.com", "2020-03-11 05:04:00"),
      ("a@gmail.com", "2020-04-05 12:04:00"),
      ("b@gmail.com", "2020-05-03 03:04:00")  
    ).toDF("email", "transaction_timestamp")

val expectedDF = Seq(
      ("a@gmail.com", "2019-10-01 00:04:00", 1),
      ("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
      ("a@gmail.com", "2019-11-22 02:04:00", 2),
      ("a@gmail.com", "2019-11-22 05:04:00", 3),
      ("a@gmail.com", "2019-12-02 03:04:00", 3),
      ("a@gmail.com", "2020-01-01 04:04:00", 1),
      ("a@gmail.com", "2020-03-11 05:04:00", 1),
      ("a@gmail.com", "2020-04-05 12:04:00", 2),
      ("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count")
dwthyt8l

dwthyt8l1#

你是对的,这是一个数据倾斜的问题,减少窗口大小将有很大帮助。要想获得过去30天的信息,你不需要等到时代的开始。同样,如果构建一个带有时间索引的窗口,那么在每个窗口的开始处的计算将是错误的,因为它将无法访问上一个窗口。
我建议构建一个索引,每30天递增一次,两个重叠窗口大小为60天,如下图所示:

为了理解这是如何工作的,让我们考虑如图所示的一个数据点 index=2 . 如果您有一个30天大小的窗口,它将需要访问其窗口内和前一个窗口内的数据。那是不可能的。这就是为什么我们建立更大的窗口,以便我们可以访问所有的数据。如果我们考虑 win1 ,我们的问题与30天大小的索引相同。如果我们考虑 win2 但是,索引1的窗口中提供了所有数据。
对于索引为3的点,我们将使用 win1 . 对于索引为4的点, win2 基本上,对于偶数指数,我们使用 win2 . 对于奇数索引,我们使用 win1 . 这种方法将大大减少最大分区大小,从而减少单个任务中处理的最大数据量。
代码只是上面解释的翻译:

val winSize = NumberOfSecondsIn30Days

val win1 = Window
    .partitionBy("email", "index1")
    .orderBy(col("transaction_timestamp"))
    .rangeBetween(-winSize, Window.currentRow)
val win2 = Window
    .partitionBy("email", "index2")
    .orderBy(col("transaction_timestamp"))
    .rangeBetween(-winSize, Window.currentRow)

val indexed_df = new_df
    // the group by is only there in case there are duplicated timestamps,
    // so as to lighten the size of the windows
    .groupBy("email", "transaction_timestamp")
    .count()
    .withColumn("index",
        'transaction_timestamp / winSize cast "long")
    .withColumn("index1",
        ('transaction_timestamp / (winSize * 2)) cast "long")
    .withColumn("index2",
        (('transaction_timestamp + winSize) / (winSize * 2)) cast "long")

val result = indexed_df
    .withColumn("count", when(('index mod 2) === 0, sum('count) over win2)
                                      .otherwise(sum('count) over win1))
uqdfh47h

uqdfh47h2#

你的一些分区可能太大,这是由于事实上,有些电子邮件,有太多的数据在一个月。
要解决这个问题,您可以创建一个新的Dataframe,其中只包含电子邮件和时间戳。然后,通过电子邮件和时间戳分组,计算行数,并计算窗口中的数据,希望能少得多。如果时间戳趋向于重复,即 df.count 远大于 df.select("email", "timestamp").distinct.count . 如果不是这样,您可以截断时间戳,但代价是丢失一些精度。这样,您就不用计算过去30天内发生的次数(因为时间戳是以秒为单位的,所以只需一秒钟),而是根据需要计算发生的次数(一分钟、一小时甚至一天)。你会损失一点精度,但会大大加快计算速度。精度越高,速度就越快。
代码如下所示:

// 3600 means hourly precision.
// Set to 60 for minute precision, 1 for second precision, 24*3600 for one day.
// Note that even precisionLoss = 1 might make you gain speed depending on
// the distribution of your data
val precisionLoss = 3600 
val win_size = NumberOfSecondsIn30Days / precisionLoss

val winSpec = Window
  .partitionBy("email")
  .orderBy("truncated_timestamp")
  .rangeBetween(-win_size, Window.currentRow)

val new_df = df.withColumn("truncated_timestamp",
                      unix_timestamp($"timestamp") / 3600 cast "long")

val counts = new_df
  .groupBy("email", "truncated_timestamp")
  .count
  .withColumn("count", sum('count) over winSpec)

val result = new_df
  .join(counts, Seq("email", "truncated_timestamp"))
6pp0gazn

6pp0gazn3#

我们还是可以避开Windows的
对于上述df

val df2 = df.withColumn("timestamp", unix_timestamp($"transaction_timestamp").cast(LongType))

val df3 = df2.withColumnRenamed("timestamp","timestamp_2").drop("transaction_timestamp")

val finalCountDf = df2.join(df3,Seq("email"))
.withColumn("is_within_30", when( $"timestamp" - $"timestamp_2" < NumberOfSecondsIn30Days && $"timestamp" - $"timestamp_2" > 0 , 1).otherwise(0))
.groupBy("email","transaction_timestamp").agg(sum("is_within_30") as "count")
.withColumn("count",$"count"+1)

finalCountDf.orderBy("transaction_timestamp").show
/*
+-----------+---------------------+-----+
|      email|transaction_timestamp|count|
+-----------+---------------------+-----+
|a@gmail.com|  2019-10-01 00:04:00|    1|
|a@gmail.com|  2019-11-02 01:04:00|    1|
|a@gmail.com|  2019-11-22 02:04:00|    2|
|a@gmail.com|  2019-11-22 05:04:00|    3|
|a@gmail.com|  2019-12-02 03:04:00|    3|
|a@gmail.com|  2020-01-01 04:04:00|    1|
|a@gmail.com|  2020-03-11 05:04:00|    1|
|a@gmail.com|  2020-04-05 12:04:00|    2|
|b@gmail.com|  2020-05-03 03:04:00|    1|
+-----------+---------------------+-----+

* /

说明:
根据“email”制作成对的时间戳(join on email)
比较每一对并检查它是否在过去30天内:如果是,则标记为1或0
根据“电子邮件”和“交易时间戳”汇总计数
假设:(电子邮件,事务时间戳)是不同的。如果不是,我们可以通过添加单调递增ID来处理

相关问题