pySpark Reduce抛出Py4JJavaError

k4emjkb1  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(802)

我是spark、hadoop和所有大数据生态系统的初学者。
我使用spark 3.0.1、Hadoop2.7和Python3.6。
我有一个.json文件(下面的json只是实际文件的概述):

[{"number":122,"name":"122 - LOWER RIVER TCE / ELLIS ST","address":"Lower River Tce / Ellis St","latitude":-27.482279,"longitude":153.028723},{"number":91,"name":"91 - MAIN ST / DARRAGH ST","address":"Main St / Darragh St","latitude":-27.47059,"longitude":153.036046}]

我想对它进行解析,对它做一些数据准备,然后使用kmeans进行聚类。
以下是我目前所做的:

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import numpy as np
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import matplotlib.pyplot as plt

conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)

rdd = rdd.flatMap(lambda line: line.split('},{'))

rdd = rdd.map(lambda row: row.replace('[', ""))

rdd = rdd.map(lambda row: row.replace('{', ""))

rdd = rdd.map(lambda row: "{"+row+"}")

import json
rdd = rdd.map(lambda row: json.loads(row))

rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))

当我试图减少: rdd=rdd.reduce(lambda number, pos : pos) 我得到以下错误:

---------------------------------------------------------------------------
    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-32-5db1324541cf> in <module>
    ----> 1 rdd=rdd.reduce(lambda number, pos : pos)

    /opt/spark/python/pyspark/rdd.py in reduce(self, f)
        842             yield reduce(f, iterator, initial)
        843 
    --> 844         vals = self.mapPartitions(func).collect()
        845         if vals:
        846             return reduce(f, vals)

    /opt/spark/python/pyspark/rdd.py in collect(self)
        814         """
        815         with SCCallSiteSync(self.context) as css:
    --> 816             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
        817         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
        818 

    /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
       1255         answer = self.gateway_client.send_command(command)
       1256         return_value = get_return_value(
    -> 1257             answer, self.gateway_client, self.target_id, self.name)
       1258 
       1259         for temp_arg in temp_args:

    /opt/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
         61     def deco(*a,**kw):
         62         try:
    ---> 63             return f(*a,**kw)
         64         except py4j.protocol.Py4JJavaError as e:
         65             s = e.java_exception.toString()

    /opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
        326                 raise Py4JJavaError(
        327                     "An error occurred while calling {0}{1}{2}.\n".
    --> 328                     format(target_id, ".", name), value)
        329             else:
        330                 raise Py4JError(

    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 16, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
        process()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
        return f(*args,**kwargs)
      File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
      File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
        return _default_decoder.decode(s)
      File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
        raise JSONDecodeError("Extra data", s, end)
    json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
        process()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
        return f(*args,**kwargs)
      File "<ipython-input-17-aa02b6b59ccd>", line 2, in <lambda>
      File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
        return _default_decoder.decode(s)
      File "/usr/lib64/python3.6/json/decoder.py", line 342, in decode
        raise JSONDecodeError("Extra data", s, end)
    json.decoder.JSONDecodeError: Extra data: line 1 column 135 (char 134)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

有人能帮我吗?我会很感激的。
当做

okxuctiv

okxuctiv1#

似乎是最后的收尾卷发 } 和正方形 ] 支架尚未拆除

rdd = rdd.map(lambda row: row.replace('[', ""))

rdd = rdd.map(lambda row: row.replace('{', ""))

//add the following
rdd = rdd.map(lambda row: row.replace(']', ""))

rdd = rdd.map(lambda row: row.replace('}', ""))

或者你可以考虑让 json 包为您执行所有json提取

conf = SparkConf().setAppName('MyApp')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

FEATURES_COL = ['latitude', 'longitude']
path = 'hdfs:/public/bikes/Brisbane_CityBike.json'
rdd = sc.textFile(path)

import json
rdd = rdd.flatMap(lambda line: json.loads(line))

rdd = rdd.map(lambda row: (row['number'], [row['longitude'], row['latitude']]))

相关问题