spark中的地理空间nkk搜索

7xzttuei  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(278)

我有纽约出租车的数据,我正试图应用knn搜索与纽约市的poi匹配每一个点与最接近的poi点使用geospark
纽约出租车数据:

+--------------------+--------+--------------------+---------------------+-------------------+------------------+-------------------+------------------+------------------+------------+-------------------+---------------+
|               point|VendorID|lpep_pickup_datetime|Lpep_dropoff_datetime|   Pickup_longitude|   Pickup_latitude|  Dropoff_longitude|  Dropoff_latitude|      Total_amount|Payment_type|      Trip_distance|Passenger_count|
+--------------------+--------+--------------------+---------------------+-------------------+------------------+-------------------+------------------+------------------+------------+-------------------+---------------+
|POINT (-74.008355...|     CMT| 2009-04-29 01:43:04|  2009-04-29 01:49:30|-74.008354999999995|40.735469999999999|-73.991578000000004|40.759819999999998|8.8599999999999994|      Credit| 2.2999999999999998|              1|
|POINT (-73.967964...|     CMT| 2009-01-11 01:13:42|  2009-01-11 01:39:51|-73.967963999999995|         40.801575|-73.999008000000003|40.720258999999999|24.600000000000001|        Cash| 8.9000000000000004|              1|
|POINT (-74.007882...|     CMT| 2013-02-11 14:53:25|  2013-02-11 15:08:51|-74.007881999999995|40.711872999999997|-74.000084999999999|         40.757424|18.850000000000001|         CRD|                3.5|              2|
|POINT (-73.972693...|     CMT| 2009-04-23 12:08:30|  2009-04-23 12:26:16|-73.972693000000007|         40.753532|-73.989721000000003|40.760554999999997|9.6999999999999993|        Cash| 1.1000000000000001|              1|
|POINT (-73.999168...|     VTS| 2009-04-11 02:38:00|  2009-04-11 02:44:00|-73.999167999999997|         40.728023|-73.999094999999997|40.734212999999997|6.4000000000000004|      Credit|0.56999999999999995|              2|

纽约警察局

+--------------------+-------+---------+---------+---+-------+--------+---+-------+--------------------+--------------------+----------+------+--------+-------+--------------------+
|            geometry|SAFTYPE|SEGMENTID|COMPLEXID|SOS|PLACEID|FACI_DOM|BIN|BOROUGH|             CREATED|            MODIFIED|FACILITY_T|SOURCE|    B7SC|PRI_ADD|                NAME|
+--------------------+-------+---------+---------+---+-------+--------+---+-------+--------------------+--------------------+----------+------+--------+-------+--------------------+
|POINT (-73.931493...|      N|   193927|      401|  1|1031153|       8|  0|      1|04/12/2018 12:00:...|03/12/2020 12:00:...|         6|   DCP|11095502|      0|HRD RAMP TO GWB O...|
|POINT (-74.196729...|      N|   321726|        0|  1|  19954|       8|  0|      5|03/15/2008 12:00:...|03/16/2020 12:00:...|         6|   DCP|59703501|      0|            GOETHALS|
|POINT (-73.958021...|      N|   138896|        0|  1|1026908|       8|  0|      1|10/20/2015 12:00:...|03/16/2020 12:00:...|         6| DoITT|19792015|      0|KOCH BR PEDESTRIA...|
|POINT (-73.950616...|      N|   139388|        0|  1|1026909|       8|  0|      4|10/20/2015 12:00:...|03/16/2020 12:00:...|         6| DoITT|49734017|      0|KOCH BR PEDESTRIA...|
|POINT (-73.816783...|      N|   179357|        0|  2|1031297|       8|  0|      4|04/13/2018 12:00:...|04/19/2018 12:00:...|         6|   DCP|40074101|      0|14 AV OV BELT CRO...|
|POINT (-73.826157...|      N|    91318|        0|  1|1031312|       8|  0|      4|04/13/2018 12:00:...|04/19/2018 12:00:...|         6|   DCP|40078101|      0|UNION ST OVER LIR...|
|POINT (-73.816116...|      N|    91681|        0|  2|1031316|       8|  0|      4|04/13/2018 12:00:...|04/19/2018 12:00:...|         6|   DCP|42319006|      0|149 ST BR OVR LIR...|

正在为poiDataframe创建spatialrdd:

var nyc_poi_geoRDD = Adapter.toSpatialRdd(nyc_poi_geo, "geometry") 
nyc_poi_geoRDD.analyze()

val buildOnSpatialPartitionedRDD = false // Set to TRUE only if run join query
nyc_poi_geoRDD.buildIndex(IndexType.RTREE, buildOnSpatialPartitionedRDD)

我创建了这个自定义项:

val knn = udf((P: Geometry) => KNNQuery.SpatialKnnQuery(nyc_poi_geoRDD, P, K, usingIndex).toArray.map(_.toString))
``` `taxi_df.withColumn("POI2", knn('point)).show()` 返回此错误

20/09/27 03:08:19 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 2653, executor 112): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (geometry) => array)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题