我有纽约出租车的数据,我正试图应用knn搜索与纽约市的poi匹配每一个点与最接近的poi点使用geospark
纽约出租车数据:
+--------------------+--------+--------------------+---------------------+-------------------+------------------+-------------------+------------------+------------------+------------+-------------------+---------------+
| point|VendorID|lpep_pickup_datetime|Lpep_dropoff_datetime| Pickup_longitude| Pickup_latitude| Dropoff_longitude| Dropoff_latitude| Total_amount|Payment_type| Trip_distance|Passenger_count|
+--------------------+--------+--------------------+---------------------+-------------------+------------------+-------------------+------------------+------------------+------------+-------------------+---------------+
|POINT (-74.008355...| CMT| 2009-04-29 01:43:04| 2009-04-29 01:49:30|-74.008354999999995|40.735469999999999|-73.991578000000004|40.759819999999998|8.8599999999999994| Credit| 2.2999999999999998| 1|
|POINT (-73.967964...| CMT| 2009-01-11 01:13:42| 2009-01-11 01:39:51|-73.967963999999995| 40.801575|-73.999008000000003|40.720258999999999|24.600000000000001| Cash| 8.9000000000000004| 1|
|POINT (-74.007882...| CMT| 2013-02-11 14:53:25| 2013-02-11 15:08:51|-74.007881999999995|40.711872999999997|-74.000084999999999| 40.757424|18.850000000000001| CRD| 3.5| 2|
|POINT (-73.972693...| CMT| 2009-04-23 12:08:30| 2009-04-23 12:26:16|-73.972693000000007| 40.753532|-73.989721000000003|40.760554999999997|9.6999999999999993| Cash| 1.1000000000000001| 1|
|POINT (-73.999168...| VTS| 2009-04-11 02:38:00| 2009-04-11 02:44:00|-73.999167999999997| 40.728023|-73.999094999999997|40.734212999999997|6.4000000000000004| Credit|0.56999999999999995| 2|
纽约警察局
+--------------------+-------+---------+---------+---+-------+--------+---+-------+--------------------+--------------------+----------+------+--------+-------+--------------------+
| geometry|SAFTYPE|SEGMENTID|COMPLEXID|SOS|PLACEID|FACI_DOM|BIN|BOROUGH| CREATED| MODIFIED|FACILITY_T|SOURCE| B7SC|PRI_ADD| NAME|
+--------------------+-------+---------+---------+---+-------+--------+---+-------+--------------------+--------------------+----------+------+--------+-------+--------------------+
|POINT (-73.931493...| N| 193927| 401| 1|1031153| 8| 0| 1|04/12/2018 12:00:...|03/12/2020 12:00:...| 6| DCP|11095502| 0|HRD RAMP TO GWB O...|
|POINT (-74.196729...| N| 321726| 0| 1| 19954| 8| 0| 5|03/15/2008 12:00:...|03/16/2020 12:00:...| 6| DCP|59703501| 0| GOETHALS|
|POINT (-73.958021...| N| 138896| 0| 1|1026908| 8| 0| 1|10/20/2015 12:00:...|03/16/2020 12:00:...| 6| DoITT|19792015| 0|KOCH BR PEDESTRIA...|
|POINT (-73.950616...| N| 139388| 0| 1|1026909| 8| 0| 4|10/20/2015 12:00:...|03/16/2020 12:00:...| 6| DoITT|49734017| 0|KOCH BR PEDESTRIA...|
|POINT (-73.816783...| N| 179357| 0| 2|1031297| 8| 0| 4|04/13/2018 12:00:...|04/19/2018 12:00:...| 6| DCP|40074101| 0|14 AV OV BELT CRO...|
|POINT (-73.826157...| N| 91318| 0| 1|1031312| 8| 0| 4|04/13/2018 12:00:...|04/19/2018 12:00:...| 6| DCP|40078101| 0|UNION ST OVER LIR...|
|POINT (-73.816116...| N| 91681| 0| 2|1031316| 8| 0| 4|04/13/2018 12:00:...|04/19/2018 12:00:...| 6| DCP|42319006| 0|149 ST BR OVR LIR...|
正在为poiDataframe创建spatialrdd:
var nyc_poi_geoRDD = Adapter.toSpatialRdd(nyc_poi_geo, "geometry")
nyc_poi_geoRDD.analyze()
val buildOnSpatialPartitionedRDD = false // Set to TRUE only if run join query
nyc_poi_geoRDD.buildIndex(IndexType.RTREE, buildOnSpatialPartitionedRDD)
我创建了这个自定义项:
val knn = udf((P: Geometry) => KNNQuery.SpatialKnnQuery(nyc_poi_geoRDD, P, K, usingIndex).toArray.map(_.toString))
``` `taxi_df.withColumn("POI2", knn('point)).show()` 返回此错误
20/09/27 03:08:19 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 2653, executor 112): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (geometry) => array)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
暂无答案!
目前还没有任何答案,快来回答吧!