我正在使用spark submit命令执行下面的代码,该命令每5分钟从src hive表(hive从spark streaming作业获取数据,源是Kafka)获取数据,并在目标端的分区的最后2个小时内进行一些聚合。
val sparksql="insert OVERWRITE table hivedesttable(partition_date,partition_hour)
select "some business logic with aggregation and group by condition" from hivesrctable
where concat(partitionDate,":",partitionHour) in ${partitionDateHour}"
new Thread(new Runnable {
override def run(): Unit = {
while (true) {
val currentTs = java.time.LocalDateTime.now
var partitionDateHour = (0 until 2)
.map(h => currentTs.minusHours(h))
.map(ts => s"'${ts.toString.substring(0, 10)}${":"}${ts.toString.substring(11,13)}'")
.toList.toString().drop(4)
/** replacing ${partitionDateHour} value in query from current Thread value dynamically*/
val sparksql= spark_sql.replace("${partitionDateHour}",partitionDateHour)
spark.sql(sparksql)
Thread.sleep(300000)}}}).start()
scala.io.StdIn.readLine()
在显示下面的错误消息之前,它正常工作了5到6个小时。
Exception in thread "dispatcher-event-loop-5" in java.lang.OutOfMemoryError: GC overhead
limit exceeded
22/12/13 19:07:42 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning
+- *HashAggregate
+- Exchange hashpartitioning
+- *HashAggregate
+- *Filter
我的Spark提交配置:
--conf "spark.hadoop.hive.exec.dynamic.partition=true"
--conf "spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict"
--num-executors 1
--driver-cores 1
--driver-memory 1G
--executor-cores 1
--executor-memory 1G
我尝试增加-executor-memory,但问题仍然存在,我想知道是否有一种方法可以在每个线程之后释放GC进程的所有资源,以便释放不需要的对象和资源。
有谁能给我出个主意,怎么处理这种情况?
1条答案
按热度按时间nwsw7zdq1#
java.lang.OutOfMemoryError: GC Overhead limit exceeded
发生在Java进程花费超过98%的时间进行垃圾收集,并且回收不到2%的堆的情况下。可能有两个原因:(1.)内存泄漏-在大多数情况下,这是根本原因(2.)您没有使用足够的堆内存。
我建议做堆转储分析,看看你是否有内存泄漏。另外,试着增加内存,看看效果如何。