azure blob存储的spark流式处理检查点问题:taskcompletionlistener中的错误为null

pokxtpni  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(377)

我正在使用spark结构化流的检查点功能,将chekpoint元数据存储为azure blob。
但我得到下面的错误,从日志似乎是删除临时文件,并试图再次访问它。
下面是详细的日志和任务计划
日志

2021-02-10 20:28:55.633 DEBUG --- [Executor task launch worker for task 15] org.apache.hadoop.fs.azure.NativeAzureFileSystem : Delete Successful for : wasb://mosaic-k8s-hdi-container@mosaick8shdistorage.blob.core.windows.net/k8s_performance/streamCheckpoint/c0c92f4f-2708-412c-82c1-053b32ba63c9pot5/MYSQL1/state/0/2/.1.delta.3554a10d-9d96-4ddb-84f1-a49334757dc9.TID15.tmp
2021-02-10 20:28:55.634 ERROR --- [Executor task launch worker for task 15] org.apache.spark.internal.Logging$class : Error in TaskCompletionListener
java.lang.NullPointerException
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.write(NativeAzureFileSystem.java:1140)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:258)
    at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:363)
    at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:317)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:508)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:65)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:64)
    at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2021-02-10 20:28:55.639 ERROR --- [Executor task launch worker for task 15] org.apache.spark.internal.Logging$class : Exception in task 2.0 in stage 3.0 (TID 15)
org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2021-02-10 20:28:55.658 DEBUG --- [dispatcher-event-loop-0] org.apache.spark.internal.Logging$class : parentName: , name: TaskSet_3.0, runningTasks: 3
2021-02-10 20:28:55.659 DEBUG --- [dispatcher-event-loop-0] org.apache.spark.internal.Logging$class : No tasks for locality level NO_PREF, so moving to locality level ANY
2021-02-10 20:28:55.661 WARN  --- [task-result-getter-1] org.apache.spark.internal.Logging$class : Lost task 2.0 in stage 3.0 (TID 15, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2021-02-10 20:28:55.663 ERROR --- [task-result-getter-1] org.apache.spark.internal.Logging$class : Task 2 in stage 3.0 failed 1 times; aborting job
2021-02-10 20:28:55.672 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : Cancelling stage 3
2021-02-10 20:28:55.673 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : Killing all running tasks in stage 3: Stage cancelled
2021-02-10 20:28:55.678 INFO  --- [dispatcher-event-loop-2] org.apache.spark.internal.Logging$class : Executor is trying to kill task 3.0 in stage 3.0 (TID 16), reason: Stage cancelled
2021-02-10 20:28:55.679 INFO  --- [dispatcher-event-loop-2] org.apache.spark.internal.Logging$class : Executor is trying to kill task 0.0 in stage 3.0 (TID 13), reason: Stage cancelled
2021-02-10 20:28:55.679 INFO  --- [dispatcher-event-loop-2] org.apache.spark.internal.Logging$class : Executor is trying to kill task 1.0 in stage 3.0 (TID 14), reason: Stage cancelled
2021-02-10 20:28:55.680 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : Stage 3 was cancelled
2021-02-10 20:28:55.681 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : ResultStage 3 (start at taskCompletion.java:414) failed in 47.809 s due to Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 15, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
2021-02-10 20:28:55.687 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 2, remaining stages = 3
2021-02-10 20:28:55.687 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 1, remaining stages = 2
2021-02-10 20:28:55.687 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 3, remaining stages = 1
2021-02-10 20:28:55.688 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 0, remaining stages = 0
2021-02-10 20:28:55.689 INFO  --- [stream execution thread for [id = 3e8b8ac2-f836-44e1-b71f-9e0c030ccc79, runId = 2222948c-53e5-411a-ad22-7083bc10ed5f]] org.apache.spark.internal.Logging$class : Job 0 failed: start at taskCompletion.java:414, took 244.059233 s
2021-02-10 20:28:55.691 ERROR --- [stream execution thread for [id = 3e8b8ac2-f836-44e1-b71f-9e0c030ccc79, runId = 2222948c-53e5-411a-ad22-7083bc10ed5f]] org.apache.spark.internal.Logging$class : Aborting job 0b63e841-559f-4106-a3f8-f680fc7fdfcc.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 15, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

逻辑计划

=== Streaming Query ===
Identifier: [id = 3e8b8ac2-f836-44e1-b71f-9e0c030ccc79, runId = 2222948c-53e5-411a-ad22-7083bc10ed5f]
Current Committed Offsets: {}
Current Available Offsets: {org.apache.spark.sql.eventhubs.EventHubsSource@5a86ea54: {"test":{"0":49743985}},org.apache.spark.sql.eventhubs.EventHubsSource@108ea531: {"sample":{"2":155436179,"5":155434270,"4":155415112,"7":155434738,"1":155439493,"3":155417225,"6":155429651,"0":155439349}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [GrpCol#1515, COL_1_COUNT#2554L, window#2136-T60000ms.start AS start#2561, window#2136-T60000ms.end AS end#2562]
+- SubqueryAlias `groupby1`
   +- Project [GrpCol#1515, window#2136-T60000ms, count(COL_1)#2549L AS COL_1_COUNT#2554L]
      +- Aggregate [GrpCol#1515, window#2550-T60000ms], [GrpCol#1515, window#2550-T60000ms AS window#2136-T60000ms, count(COL_1#1310L) AS count(COL_1)#2549L]
         +- Filter isnotnull(TIMESTAMP_COL_3#1514-T60000ms)
            +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0) + 60000000), LongType, TimestampType)) AS window#2550-T60000ms, COL_1#1310L, COL_10#1311, COL_100#1312, COL_101#1313L, COL_102#1314L, COL_103#1315L, COL_104#1316L, COL_105#1317, COL_106#1318L, COL_107#1319L, COL_108#1320L, COL_109#1321, COL_11#1322L, COL_110#1323, COL_111#1324, COL_112#1325, COL_113#1326L, COL_114#1327, COL_115#1328L, COL_116#1329L, COL_117#1330L, COL_118#1331, COL_119#1332, ... 389 more fields]
               +- EventTimeWatermark TIMESTAMP_COL_3#1514: timestamp, interval 1 minutes
                  +- Project [COL_1#48L AS COL_1#1310L, COL_10#49 AS COL_10#1311, COL_100#50 AS COL_100#1312, COL_101#51L AS COL_101#1313L, COL_102#52L AS COL_102#1314L, COL_103#53L AS COL_103#1315L, COL_104#54L AS COL_104#1316L, COL_105#55 AS COL_105#1317, COL_106#56L AS COL_106#1318L, COL_107#57L AS COL_107#1319L, COL_108#58L AS COL_108#1320L, COL_109#59 AS COL_109#1321, COL_11#60L AS COL_11#1322L, COL_110#61 AS COL_110#1323, COL_111#62 AS COL_111#1324, COL_112#63 AS COL_112#1325, COL_113#64L AS COL_113#1326L, COL_114#65 AS COL_114#1327, COL_115#66L AS COL_115#1328L, COL_116#67L AS COL_116#1329L, COL_117#68L AS COL_117#1330L, COL_118#69 AS COL_118#1331, COL_119#70 AS COL_119#1332, COL_12#71L AS COL_12#1333L, ... 388 more fields]
                     +- Join Inner, (COL_1#48L = COL_1#703L)
                        :- SubqueryAlias `watermark1`
                        :  +- EventTimeWatermark TIMESTAMP_COL_3#447: timestamp, interval 1 minutes
                        :     +- Project [COL_1#48L, COL_10#49, COL_100#50, COL_101#51L, COL_102#52L, COL_103#53L, COL_104#54L, COL_105#55, COL_106#56L, COL_107#57L, COL_108#58L, COL_109#59, COL_11#60L, COL_110#61, COL_111#62, COL_112#63, COL_113#64L, COL_114#65, COL_115#66L, COL_116#67L, COL_117#68L, COL_118#69, COL_119#70, COL_12#71L, ... 182 more fields]
                        :        +- SubqueryAlias `eventhub1`
                        :           +- Project [LIST#37.COL_1 AS COL_1#48L, LIST#37.COL_10 AS COL_10#49, LIST#37.COL_100 AS COL_100#50, LIST#37.COL_101 AS COL_101#51L, LIST#37.COL_102 AS COL_102#52L, LIST#37.COL_103 AS COL_103#53L, LIST#37.COL_104 AS COL_104#54L, LIST#37.COL_105 AS COL_105#55, LIST#37.COL_106 AS COL_106#56L, LIST#37.COL_107 AS COL_107#57L, LIST#37.COL_108 AS COL_108#58L, LIST#37.COL_109 AS COL_109#59, LIST#37.COL_11 AS COL_11#60L, LIST#37.COL_110 AS COL_110#61, LIST#37.COL_111 AS COL_111#62, LIST#37.COL_112 AS COL_112#63, LIST#37.COL_113 AS COL_113#64L, LIST#37.COL_114 AS COL_114#65, LIST#37.COL_115 AS COL_115#66L, LIST#37.COL_116 AS COL_116#67L, LIST#37.COL_117 AS COL_117#68L, LIST#37.COL_118 AS COL_118#69, LIST#37.COL_119 AS COL_119#70, LIST#37.COL_12 AS COL_12#71L, ... 180 more fields]
                        :              +- Project [jsontostructs(StructField(COL_1,LongType,true), StructField(COL_10,BooleanType,true), StructField(COL_100,BooleanType,true), StructField(COL_101,LongType,true), StructField(COL_102,LongType,true), StructField(COL_103,LongType,true), StructField(COL_104,LongType,true), StructField(COL_105,StringType,true), StructField(COL_106,LongType,true), StructField(COL_107,LongType,true), StructField(COL_108,LongType,true), StructField(COL_109,StringType,true), StructField(COL_11,LongType,true), StructField(COL_110,DoubleType,true), StructField(COL_111,DoubleType,true), StructField(COL_112,StringType,true), StructField(COL_113,LongType,true), StructField(COL_114,StringType,true), StructField(COL_115,LongType,true), StructField(COL_116,LongType,true), StructField(COL_117,LongType,true), StructField(COL_118,StringType,true), StructField(COL_119,DoubleType,true), StructField(COL_12,LongType,true), ... 173 more fields) AS LIST#37, body#27, partition#10, offset#11, sequenceNumber#12L, enqueuedTime#13, publisher#14, partitionKey#15, properties#16, systemProperties#17]
                        :                 +- Project [cast(body#9 as string) AS body#27, partition#10, offset#11, sequenceNumber#12L, enqueuedTime#13, publisher#14, partitionKey#15, properties#16, systemProperties#17]
                        :                    +- StreamingExecutionRelation org.apache.spark.sql.eventhubs.EventHubsSource@108ea531, [body#9, partition#10, offset#11, sequenceNumber#12L, enqueuedTime#13, publisher#14, partitionKey#15, properties#16, systemProperties#17]
                        +- SubqueryAlias `watermark2`
                           +- EventTimeWatermark TIMESTAMP_COL_3#1102: timestamp, interval 1 minutes
                              +- Project [COL_1#703L, COL_10#704, COL_100#705, COL_101#706L, COL_102#707L, COL_103#708L, COL_104#709L, COL_105#710, COL_106#711L, COL_107#712L, COL_108#713L, COL_109#714, COL_11#715L, COL_110#716, COL_111#717, COL_112#718, COL_113#719L, COL_114#720, COL_115#721L, COL_116#722L, COL_117#723L, COL_118#724, COL_119#725, COL_12#726L, ... 182 more fields]
                                 +- SubqueryAlias `eventhub2`
                                    +- Project [LIST#692.COL_1 AS COL_1#703L, LIST#692.COL_10 AS COL_10#704, LIST#692.COL_100 AS COL_100#705, LIST#692.COL_101 AS COL_101#706L, LIST#692.COL_102 AS COL_102#707L, LIST#692.COL_103 AS COL_103#708L, LIST#692.COL_104 AS COL_104#709L, LIST#692.COL_105 AS COL_105#710, LIST#692.COL_106 AS COL_106#711L, LIST#692.COL_107 AS COL_107#712L, LIST#692.COL_108 AS COL_108#713L, LIST#692.COL_109 AS COL_109#714, LIST#692.COL_11 AS COL_11#715L, LIST#692.COL_110 AS COL_110#716, LIST#692.COL_111 AS COL_111#717, LIST#692.COL_112 AS COL_112#718, LIST#692.COL_113 AS COL_113#719L, LIST#692.COL_114 AS COL_114#720, LIST#692.COL_115 AS COL_115#721L, LIST#692.COL_116 AS COL_116#722L, LIST#692.COL_117 AS COL_117#723L, LIST#692.COL_118 AS COL_118#724, LIST#692.COL_119 AS COL_119#725, LIST#692.COL_12 AS COL_12#726L, ... 180 more fields]
                                       +- Project [jsontostructs(StructField(COL_1,LongType,true), StructField(COL_10,BooleanType,true), StructField(COL_100,BooleanType,true), StructField(COL_101,LongType,true), StructField(COL_102,LongType,true), StructField(COL_103,LongType,true), StructField(COL_104,LongType,true), StructField(COL_105,StringType,true), StructField(COL_106,LongType,true), StructField(COL_107,LongType,true), StructField(COL_108,LongType,true), StructField(COL_109,StringType,true), StructField(COL_11,LongType,true), StructField(COL_110,DoubleType,true), StructField(COL_111,DoubleType,true), StructField(COL_112,StringType,true), StructField(COL_113,LongType,true), StructField(COL_114,StringType,true), StructField(COL_115,LongType,true), StructField(COL_116,LongType,true), StructField(COL_117,LongType,true), StructField(COL_118,StringType,true), StructField(COL_119,DoubleType,true), StructField(COL_12,LongType,true), ... 173 more fields) AS LIST#692, body#682, partition#665, offset#666, sequenceNumber#667L, enqueuedTime#668, publisher#669, partitionKey#670, properties#671, systemProperties#672]
                                          +- Project [cast(body#664 as string) AS body#682, partition#665, offset#666, sequenceNumber#667L, enqueuedTime#668, publisher#669, partitionKey#670, properties#671, systemProperties#672]
                                             +- StreamingExecutionRelation org.apache.spark.sql.eventhubs.EventHubsSource@5a86ea54, [body#664, partition#665, offset#666, sequenceNumber#667L, enqueuedTime#668, publisher#669, partitionKey#670, properties#671, systemProperties#672]

Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.

jar版本azure存储-8.4.0,hadoop azure-2.9.2

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题