scala Spark中每个任务的监听程序

kx5bkwkv  于 2022-11-09  发布在  Scala
关注(0)|答案(2)|浏览(267)

我正在使用ApacheSpark将数据从SQL Server读取到CSV,版本详细信息如下:

  • 实施‘com.microsoft.azure:spark-mssql-connector_2.12:1.2.0’
  • 实现‘org.apache.spark:spark-core_2.12:3.1.3’
  • 实现组:‘org.apache.spark’,名称:‘spark-SQL_2.12’,版本:‘3.1.3’

这里,每个导出到CSV的表数据通过以下可配置选项进一步拆分成多个任务:

  • “Low Bound”
  • “上行”
  • “numPartitions”
  • “分区列”

因此,假设NumPartition为5,则1个作业下将有5个任务
在以下方面寻求帮助:
在每个任务完成时,我需要执行一些特定于任务的操作(使用一些特定于任务的数据),那么有没有办法将一些侦听器连接到每个任务或作业?
我知道有一种方法可以通过extends SparkListener来挂钩侦听器,但这种方法可以与整个SparkContext挂钩,而SparkContext不能执行特定于任务的操作。

de90aj5v

de90aj5v1#

正如其他人已经指出的,没有办法将监听程序附加到特定的一组任务。但是,使用mapPartitions,您可以在处理数据集的分区之后(或之前)执行任意代码。正如所讨论的,in this answer分区和任务密切相关。
作为示例,使用了一个两列十行的简单CSV文件。目标是将第二列转换为大写,并在分区处理完成后立即打印一条消息。

id,column
1,a
2,b
[...]
10,j

代码:

val df = spark.read.option("header", true).option("inferSchema", true).csv(<file>)
  .repartition(5) //create 5 partitions with 2 rows each
df.mapPartitions(it => {
  var counter = 0;
  val result = it.toList.map(row => {
    counter = counter + 1;
    val resultForRow = row.getString(1).toUpperCase //the "business logic"
    (row.getInt(0), resultForRow)
  })
  println(s"${Thread.currentThread().getName()}:  I have processed ${counter} rows") //the code to be executed after a partition is done
  result.iterator
}).show()

产出:

Executor task launch worker for task 0.0 in stage 4.0 (TID 3):  I have processed 2 rows
Executor task launch worker for task 2.0 in stage 6.0 (TID 6):  I have processed 2 rows
Executor task launch worker for task 1.0 in stage 6.0 (TID 5):  I have processed 2 rows
Executor task launch worker for task 0.0 in stage 6.0 (TID 4):  I have processed 2 rows
Executor task launch worker for task 3.0 in stage 6.0 (TID 7):  I have processed 2 rows

mapPartitions中的代码在执行器中运行,因此上面的输出将出现在执行器日志中。

t98cgbkg

t98cgbkg2#

您无法将监听程序附加到任务。如果您有任何特定的逻辑要在作业完成后执行,那么最好将多个作业提交到Spark集群。希望这个能帮上忙!

相关问题