SQL Server Spark fails to kill tasks that read/write to Azure SQL-DB

plicqrtu  于 2023-03-17  发布在  Spark
关注(0)|答案(1)|浏览(109)

I am running an ETL-Pipeline on Azure-Databricks that writes cleaned data into an Azure-SQL-DB. During runtime, some tasks get killed because they get 'preempted by scheduler' (see executor-log below). My understanding is that this happens because the fair-scheduling mode of spark tries to free up resources for other tasks that should take precedence. Whenever the task to be killed happens to be a read or write from an Azure-SQL-DB, Spark fails to kill the task, ultimately killing the whole executor.

This pattern repeats on all of the workers/executors, with every dead executor having pretty much the same log-entries as the one below.

Other tasks are still killed successfully throughout the pipeline-run. It only seems to happen to tasks that build a connection to the SQL-DB.

Databricks Cluster-Details:

  • 28 GB Memory, 8 Cores x 2 (Worker)
  • 14 GB Memory, 4 Cores x 1 (Driver)
  • Runtime 5.5 LTS (Spark 2.4.3)

Additional Spark-Config (through Databricks' spark-config window):

  • yarn.nodemanager.resource.cpu-vcores 7
  • spark.executor.cores 3
  • spark.executor.memory 10g
  • spark.executor.instances 3
  • yarn.nodemanager.resource.memory-mb 26329
  • spark.serializer org.apache.spark.serializer.KryoSerializer

resulting in 4 executors with 3 Cores, 5.5 GB Storage Memory + driver (spark.executor.instances seems to get ignored)

Code-Snippet:

The problem arises for tasks executing the following code-snippet.

current_csv.repartition(5).write.jdbc(mode = 'overwrite', url=jdbc_url, table= sql_tabl, properties=connection_properties)

However this problem can also be reproduced on bigger/smaller clusters, when reading data and also forced to happen when the task or execution of the code is cancelled manually (either through the SparkUI or the cancel-button of the Databricks Notebook cell). This applies to pretty much all queries that are not done/executed more or less instantly.

Executor-Log:

19/09/10 03:03:50 INFO Executor: Executor is trying to kill task 3.0 in stage 484.0 (TID 16969), reason: preempted by scheduler
19/09/10 03:04:00 WARN Executor: Killed task 16969 is still running after 10002 ms
19/09/10 03:04:00 WARN Executor: Thread dump from task 16969:
...
19/09/10 03:04:10 WARN Executor: Killed task 16969 is still running after 20011 ms
19/09/10 03:04:10 WARN Executor: Thread dump from task 16969:
...
19/09/10 03:04:20 WARN Executor: Killed task 16969 is still running after 30013 ms
19/09/10 03:04:20 WARN Executor: Thread dump from task 16969:
...
19/09/10 03:04:30 WARN Executor: Killed task 16969 is still running after 40014 ms
19/09/10 03:04:30 WARN Executor: Thread dump from task 16969:
...
19/09/10 03:04:40 WARN Executor: Killed task 16969 is still running after 50016 ms
19/09/10 03:04:40 WARN Executor: Thread dump from task 16969:
...
19/09/10 03:04:50 WARN Executor: Killed task 16969 is still running after 60017 ms
19/09/10 03:04:50 WARN Executor: Thread dump from task 16969:
...
19/09/10 03:04:50 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Task reaper-0,5,main]
java.lang.Error: org.apache.spark.SparkException: Killing executor JVM because killed task 16969 could not be stopped within 60000 ms.
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Last Thread-Dump:

java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:171)
java.net.SocketInputStream.read(SocketInputStream.java:141)
com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream.readInternal(IOBuffer.java:1003)
com.microsoft.sqlserver.jdbc.TDSChannel$ProxyInputStream.read(IOBuffer.java:991)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.read(InputRecord.java:503)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:975)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:933)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1981)
com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:6310)
com.microsoft.sqlserver.jdbc.TDSReader.nextPacket(IOBuffer.java:6269)
com.microsoft.sqlserver.jdbc.TDSReader.ensurePayload(IOBuffer.java:6247)
com.microsoft.sqlserver.jdbc.TDSReader.readBytes(IOBuffer.java:6531)
com.microsoft.sqlserver.jdbc.TDSReader.readWrappedBytes(IOBuffer.java:6552)
com.microsoft.sqlserver.jdbc.TDSReader.readInt(IOBuffer.java:6499)
com.microsoft.sqlserver.jdbc.StreamRetStatus.setFromTDS(StreamRetStatus.java:30)
com.microsoft.sqlserver.jdbc.SQLServerStatement$1NextResult.onRetStatus(SQLServerStatement.java:1458)
com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:65)
com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1531)
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatementBatch(SQLServerPreparedStatement.java:2482)
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtBatchExecCmd.doExecute(SQLServerPreparedStatement.java:2383)
com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:219)
com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:199)
com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2294)
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:839)
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:839)
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2284)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2284)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
org.apache.spark.scheduler.Task.run(Task.scala:112)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

I can circumvent this whole issue by simply disabling preemtion or increase the cluster to the point were preemtion and therefore killing tasks is no longer necessary, but that seems to be a rather heavyhanded solution.

Since I could not find anything regarding this isse:

  • Is there an obvious reason Spark seems to be unable to kill tasks building a connection to a SQL-DB?
  • Since I am not familiar with Java/Scala and ThreadDumps, is there any additional info in these Dumps that could give me a hint as to what is going on?
dohp0rv5

dohp0rv51#

It should be a problem on database's side. You need to look into the database's slow query log and to see what query cost so much time.

The reason for task killing can be found at

https://stackoverflow.com/questions/46652363/why-does-spark-kill-tasks#:~:text=If%20a%20task%20appears%20to,the%20others%20can%20be%20killed.&text=Save%20this%20answer.,-Show%20activity%20on

相关问题