在pypsark中写入配置单元表时出错

iyfjxgzm  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(166)

我在附加我的 spark.sql() 查询到一个配置单元表,我不知道为什么。我试过把数据分区出来,但还是失败了。我可以写入Dataframe,也可以将其写入目录,但一旦尝试写入配置单元表,就会失败。
以下是我的代码示例:

gps_supplement_df = spark.sql("""WITH gps_traces AS(
    SELECT 
    gtrips.trip_id
    , to_date(gtrips.trip_date) as trip_date
    , gtrips.fleet_id
    , vin.vehicle_vin
    , gtrips.driver_id
    , gtrips.trip_distance_travelled
    , gtrips.trip_duration
    , to_timestamp(gdata.trip_timestamp, "yyyy-MM-dd'T'HH:mm:ss") as gps_timestamp
    , rank() over 
        (partition by gtrips.trip_id 
            order by to_timestamp(gdata.trip_timestamp, "yyyy-MM-dd'T'HH:mm:ss") asc) 
        as timestamp_rank
    , gdata.latitude
    , gdata.longitude
    , gdata.postcode
    , CAST(ST_Point(CAST(gdata.longitude AS DECIMAL(16,12)), CAST(gdata.latitude AS DECIMAL(16,12))) AS STRING) AS geometry
    FROM 
        cms.gps_trips gtrips
    INNER JOIN
        cms.gps_data gdata
        ON gtrips.trip_id = gdata.trip_id
    INNER JOIN
        (
            SELECT
                DISTINCT --why are there duplicates?
                    devices.vehicle_id
                    , devices.vehicle_vin
                    , devices.data_effective_timestamp
            FROM
                cms.devices devices
            INNER JOIN
                 (
                    SELECT
                        vehicle_id
                        , max(data_effective_timestamp) as data_effective_timestamp
                    FROM
                        cms.devices
                    GROUP BY
                        vehicle_id
                ) max_data_effective
                ON devices.vehicle_id = max_data_effective.vehicle_id
                AND devices.data_effective_timestamp = max_data_effective.data_effective_timestamp
        ) vin
        ON gtrips.vehicle_id = vin.vehicle_id
    WHERE 
    --Rolling 7 days
        to_date(gtrips.trip_date) between '2020-01-01' and '2020-01-31'
    )
-- Spark table incorporated---    
, na_geom as (
    SELECT
        ST_GeomFromWKT(geometry) as geometry
        , NAME_LONG
    FROM
        na_df 
    )

, gps_data_adj as(
    SELECT 
    gps.trip_id
    , gps.trip_date
    , gps.fleet_id
    , gps.gps_timestamp
    , gps.latitude
    , gps.longitude
    , gps.postcode
    , gps.trip_distance_travelled
    , gps.trip_duration
    , gps.geometry
        , ACOS(
            SIN(RADIANS(gps.latitude))*SIN(RADIANS(gps1.latitude)) + 
            COS(RADIANS(gps.latitude))*COS(RADIANS(gps1.latitude))*COS(RADIANS(gps1.longitude) - RADIANS(gps.longitude))
        )*3958.76 AS COSINES_DISTANCE
        , ASIN(
            SQRT(
                POWER(SIN((RADIANS(gps.latitude) - RADIANS(gps1.latitude))/2), 2) +
                COS(RADIANS(gps.latitude))*COS(RADIANS(gps1.latitude))*
                POWER(SIN((RADIANS(gps.longitude) - RADIANS(gps1.longitude))/2), 2)
            )
        )*3958.76*2 AS HAVERSINE_DISTANCE 
        , (UNIX_TIMESTAMP(gps1.gps_timestamp) - UNIX_TIMESTAMP(gps.gps_timestamp)) AS GPS_INTERVAL
    FROM
        gps_traces gps
    LEFT JOIN
        gps_traces gps1
            ON gps.trip_id = gps1.trip_id
            AND gps.timestamp_rank = (gps1.timestamp_rank - 1)
    )
, gps_data_wona as (

    select
        gps_data_adj.trip_id
        , gps_data_adj.trip_date
        , gps_data_adj.fleet_id
        , gps_data_adj.gps_timestamp
        , gps_data_adj.latitude
        , gps_data_adj.longitude
        , gps_data_adj.postcode
        , gps_data_adj.trip_distance_travelled
        , gps_data_adj.trip_duration
        , gps_data_adj.geometry
        , trip_summary.TRIP_HAVERSINE_DISTANCE
        , trip_summary.TRIP_GPS_DURATION
        , gps_data_adj.HAVERSINE_DISTANCE
        , gps_data_adj.GPS_INTERVAL
        , gps_data_adj.HAVERSINE_DISTANCE/trip_summary.TRIP_HAVERSINE_DISTANCE AS HAVERSINE_DISTANCE_FRACTION
        , gps_data_adj.GPS_INTERVAL/trip_summary.TRIP_GPS_DURATION AS GPS_INTERVAL_FRACTION
        , (gps_data_adj.HAVERSINE_DISTANCE/trip_summary.TRIP_HAVERSINE_DISTANCE)*gps_data_adj.trip_distance_travelled AS HAVERSINE_DISTANCE_ADJ
        , (gps_data_adj.GPS_INTERVAL/trip_summary.TRIP_GPS_DURATION)*gps_data_adj.trip_duration AS GPS_INTERVAL_ADJ
    FROM
        gps_data_adj
    INNER JOIN
        (
            SELECT
                trip_id 
                , sum(COSINES_DISTANCE) as TRIP_COSINES_DISTANCE
                , sum(HAVERSINE_DISTANCE) as TRIP_HAVERSINE_DISTANCE
                , sum(GPS_INTERVAL) AS TRIP_GPS_DURATION
            FROM
                gps_data_adj
            GROUP BY
                trip_id
        ) trip_summary
on gps_data_adj.trip_id = trip_summary.trip_id
    )

select 
 STRING(gps_data_wona.trip_id)
, STRING(gps_data_wona.trip_date)
, STRING(gps_data_wona.fleet_id)
, STRING(gps_data_wona.gps_timestamp)
, STRING(gps_data_wona.latitude)
, STRING(gps_data_wona.longitude)
, STRING(gps_data_wona.postcode)
, STRING(gps_data_wona.trip_distance_travelled)
, STRING(gps_data_wona.trip_duration)
, STRING(gps_data_wona.TRIP_HAVERSINE_DISTANCE)
, STRING(gps_data_wona.TRIP_GPS_DURATION)
, STRING(gps_data_wona.HAVERSINE_DISTANCE)
, STRING(gps_data_wona.GPS_INTERVAL)
, STRING(gps_data_wona.HAVERSINE_DISTANCE_FRACTION)
, STRING(gps_data_wona.GPS_INTERVAL_FRACTION)
, STRING(gps_data_wona.HAVERSINE_DISTANCE_ADJ)
, STRING(gps_data_wona.GPS_INTERVAL_ADJ)
, CASE
            WHEN gps_data_wona.postcode RLIKE "[A-Z]{1}[0-9]{1}[A-Z]{1}"
                THEN "Canada"
            ELSE
                CASE
                    WHEN gps_data_wona.latitude >= 33.62116425145008 OR gps_data_wona.longitude <= -119.9522148603999 OR gps_data_wona.longitude >= -85.11718096959993
                    THEN "United States"
                    ELSE na_geom.NAME_LONG
                END
        END AS COUNTRY
    FROM
        gps_data_wona
        LEFT JOIN
        na_geom
            on 
         ST_Intersects(gps_data_wona.geometry, na_geom.geometry)
        --Filters
        AND gps_data_wona.postcode NOT RLIKE "[A-Z]{1}[0-9]{1}[A-Z]{1}"
        AND gps_data_wona.longitude > -119.9522148603999
        AND gps_data_wona.longitude < -85.11718096959993
        AND gps_data_wona.latitude < 33.62116425145008       
                """) 

 +--------------------+----------+-------------------+----------------+-----------------+--------+-----------------------+-------------+-----------------------+-----------------+-------------------+------------+---------------------------+---------------------+----------------------+------------------+-------------+
|             trip_id| trip_date|      gps_timestamp|        latitude|        longitude|postcode|trip_distance_travelled|trip_duration|TRIP_HAVERSINE_DISTANCE|TRIP_GPS_DURATION| HAVERSINE_DISTANCE|GPS_INTERVAL|HAVERSINE_DISTANCE_FRACTION|GPS_INTERVAL_FRACTION|HAVERSINE_DISTANCE_ADJ|  GPS_INTERVAL_ADJ|      COUNTRY|
+--------------------+----------+-------------------+----------------+-----------------+--------+-----------------------+-------------+-----------------------+-----------------+-------------------+------------+---------------------------+---------------------+----------------------+------------------+-------------+
|2021-02-18-10007-...|2021-02-18|2021-02-18 13:11:51|36.4034576416016|-99.5645217895508|   73801|                  64.83|       133.97|      64.17130254777096|             7681| 0.7799527634659036|          44|       0.012154229889369697|  0.00572842077854446|    0.7879587237278374|0.7674365317016013|United States|

gps_supplement_df.write.insertInto("sb_group.TEST_SUPPLEMENT",overwrite=False)

错误消息:

An error was encountered:
An error occurred while calling o361.insertInto.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:86)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
    at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 41.0 failed 4 times, most recent failure: Lost task 8.3 in stage 41.0 (TID 5440, cvglpcdh11.td.afg, executor 79): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.geosparksql.expressions.ST_Intersects.eval(Predicates.scala:74)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.findNextMatch(BroadcastNestedLoopJoinExec.scala:147)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.hasNext(BroadcastNestedLoopJoinExec.scala:164)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage66.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 34 more
Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.geosparksql.expressions.ST_Intersects.eval(Predicates.scala:74)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.findNextMatch(BroadcastNestedLoopJoinExec.scala:147)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.hasNext(BroadcastNestedLoopJoinExec.scala:164)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage66.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Traceback (most recent call last):
  File "/app/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/pyspark/sql/readwriter.py", line 747, in insertInto
    self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName)
  File "/app/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/app/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a,**kw)
  File "/app/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o361.insertInto.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:86)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
    at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 41.0 failed 4 times, most recent failure: Lost task 8.3 in stage 41.0 (TID 5440, cvglpcdh11.td.afg, executor 79): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.geosparksql.expressions.ST_Intersects.eval(Predicates.scala:74)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.findNextMatch(BroadcastNestedLoopJoinExec.scala:147)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.hasNext(BroadcastNestedLoopJoinExec.scala:164)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage66.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 34 more
Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
    at org.apache.spark.sql.geosparksql.expressions.ST_Intersects.eval(Predicates.scala:74)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.findNextMatch(BroadcastNestedLoopJoinExec.scala:147)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$3$$anon$1.hasNext(BroadcastNestedLoopJoinExec.scala:164)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage66.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1363)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题