我有一个spark结构化流媒体应用程序(spark 2.4.5),它使用的是kafka。应用程序关闭了一段时间,但当我重新启动它时,出现了以下错误。
我完全理解为什么我会出错,我同意。但我似乎绕不开它。根据日志,我看到“从最早的偏移量恢复:1234332978”,但这似乎正在发生。我尝试删除检查点位置中的“source”文件夹,但也没用。
我的代码使用的是mapgroupwithstate函数,所以我确实有一些不想丢失的状态数据,因此删除整个检查点目录不是我的首选方法。我设置了:
.option(“failondataloss”,false).option(“startingoffsets”,“latest”)
但这似乎只适用于新分区。
有没有办法告诉spark接受缺失的偏移量并继续?或者采用某种方法手动删除偏移数据而不影响应用程序的“状态”?
20/07/29 01:02:40 WARN InternalKafkaConsumer: Cannot fetch offset 1215191190 (GroupId: spark-kafka-source-f9562fca-ab0c-4f7a-93c3-20506cbcdeb7--1440771761-executor, TopicPartition: cmusstats-0).
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you want your streaming query to fail on such cases, set the source
option "failOnDataLoss" to "true".
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cmusstats-0=1215191190}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:470)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:361)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:64)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:500)
at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/07/29 01:02:40 WARN InternalKafkaConsumer: Some data may be lost. Recovering from the earliest offset: 1234332978
20/07/29 01:02:40 WARN InternalKafkaConsumer:
The current available offset range is AvailableOffsetRange(1234332978,1328165875).
Offset 1215191190 is out of range, and records in [1215191190, 1215691190) will be
skipped (GroupId: spark-kafka-source-f9562fca-ab0c-4f7a-93c3-20506cbcdeb7--1440771761-executor, TopicPartition: cmusstats-0).
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you want your streaming query to fail on such cases, set the source
option "failOnDataLoss" to "true".
1条答案
按热度按时间b91juud31#
结果,结构化流应用程序最终恢复了。一段时间以来,记录了许多关于“无法获取偏移量”的错误。但经过一段时间后,处理继续进行最早的抵消。
我无法解释为什么在处理开始前我会出现这么多这样的错误,但最终还是继续了。