线程“调度程序事件循环5”java.lang中出现异常,内存不足错误:超出GC开销限制:Spark

yqkkidmi  于 2022-12-19  发布在  Apache
关注(0)|答案(1)|浏览(180)

我正在使用spark submit命令执行下面的代码,该命令每5分钟从src hive表(hive从spark streaming作业获取数据,源是Kafka)获取数据,并在目标端的分区的最后2个小时内进行一些聚合。

val sparksql="insert OVERWRITE table  hivedesttable(partition_date,partition_hour) 
    select "some business logic with aggregation and group by condition" from hivesrctable
    where concat(partitionDate,":",partitionHour) in ${partitionDateHour}"

new Thread(new Runnable {
override def run(): Unit = {
while (true) {
val currentTs = java.time.LocalDateTime.now
var partitionDateHour = (0 until 2)
.map(h => currentTs.minusHours(h))
.map(ts => s"'${ts.toString.substring(0, 10)}${":"}${ts.toString.substring(11,13)}'")
.toList.toString().drop(4)

/** replacing ${partitionDateHour} value in query from current Thread value dynamically*/
  val sparksql=  spark_sql.replace("${partitionDateHour}",partitionDateHour)
  spark.sql(sparksql)
  Thread.sleep(300000)}}}).start()
  scala.io.StdIn.readLine()

在显示下面的错误消息之前,它正常工作了5到6个小时。

Exception in thread "dispatcher-event-loop-5" in java.lang.OutOfMemoryError: GC overhead 
limit exceeded
22/12/13 19:07:42 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning
+- *HashAggregate
+- Exchange hashpartitioning
+- *HashAggregate
+- *Filter

我的Spark提交配置:

--conf "spark.hadoop.hive.exec.dynamic.partition=true"
--conf "spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict" 
--num-executors 1
--driver-cores 1 
--driver-memory 1G  
--executor-cores 1  
--executor-memory 1G

我尝试增加-executor-memory,但问题仍然存在,我想知道是否有一种方法可以在每个线程之后释放GC进程的所有资源,以便释放不需要的对象和资源。
有谁能给我出个主意,怎么处理这种情况?

nwsw7zdq

nwsw7zdq1#

java.lang.OutOfMemoryError: GC Overhead limit exceeded发生在Java进程花费超过98%的时间进行垃圾收集,并且回收不到2%的堆的情况下。
可能有两个原因:(1.)内存泄漏-在大多数情况下,这是根本原因(2.)您没有使用足够的堆内存。
我建议做堆转储分析,看看你是否有内存泄漏。另外,试着增加内存,看看效果如何。

相关问题