pyspark Py4Java错误:调用None.org.apache.spark.api.java时发生错误,

eiee3dmh  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(260)

任何人都知道为什么我一直得到这个错误在Jupyter笔记本电脑???我一直试图加载我的Tensorflow模型到Apache Spark vis SparlFlow,但我似乎不知道如何得到过去这个错误。任何帮助将不胜感激。
第一个木星细胞:

from sparkflow.graph_utils import build_graph
from sparkflow.tensorflow_async import SparkAsyncDL
import tensorflow as tf
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.pipeline import Pipeline

 from pyspark.sql import SparkSession
    from tensorflow.keras import layers
    from tensorflow.keras import losses

第二木星细胞:

def lstm_model(X_train, y_train):
    # Reshapes to input neuron
    inputs= keras.Input(shape = (X_train.shape[1], 1))\
    #Training Layers
    x_1 = layers.LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], 1))(inputs)
    x_1 = layers.Dropout(0.2)(x_1)
    x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
    x_1 = layers.Dropout(0.2)(x_1)
    x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
    x_1 = layers.Dropout(0.2)(x_1)
    x_1 = layers.LSTM(units = 50, return_sequences = True)(x_1)
    x_1 = layers.Dropout(0.2)(x_1)
    x_1 = layers.Flatten()(x_1)

    # 1 output neuron for each column prediction
    output = Dense(units=1)(x_1)
    return losses.MeanSquaredError(y_train,output)

第三木星细胞:

def dataframe_input(pandas_dataframe):

    train_data = pandas_dataframe[self.column_name].values

    # Reshaping to a 2D array
    train_data = train_data.reshape(-1,1)
    print(train_data.dtype)
    print(type(train_data))
    print(train_data.shape) 

    # Feature Scaling
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaled_train_data =scaler.fit_transform(train_data)

    # Initialzing each x_train and y_train datasets for each column
    X_train = []
    y_train = []

    # Appending scaled training data to each dataset
    for i in range(self.timesteps, len(train_data)):
        X_train.append(scaled_train_data[i - self.timesteps:i, 0])
        y_train.append(scaled_train_data[i, 0])

    # Numpy array creation, Keras requires numpy arrays for Inputs
    X_train, y_train = np.array(X_train, dtype=int), np.array(y_train)
    print(X_train.shape)
    print(X_train.dtype)

    # Reshaping to a 3D matrix (970, 30, 1)
    #X_train = np.reshape(X_train, (X_train[0], X_train[1], 1))
    print(X_train.shape)
    return X_train, y_train

第四木星细胞(我得到的错误):


# Spark Session

# In order to use APIs of SQL, HIVE, and Streaming, no need to create separate contexts as sparkSession includes all the APIs.

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

# Reading CSVto a Spark DataFrame

df = spark.read.option("inferSchema", "true").csv('"../csv_test_files/stats.csv"')

# Convert the Spark dataframe into a Pandas Dataframe

pandas_dataframe = df.select("*").toPandas()

# Get the input and ouput data for passing to the model

X_train, y_train = dataframe_input(pandas_dataframe)

错误输出:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-25-5143cc437b69> in <module>
      3 spark = SparkSession \
      4     .builder \
----> 5     .appName("Python Spark SQL basic example") \
      6     .getOrCreate()
      7 

~/anaconda3/lib/python3.7/site-packages/pyspark/sql/session.py in getOrCreate(self)
    171                     for key, value in self._options.items():
    172                         sparkConf.set(key, value)
--> 173                     sc = SparkContext.getOrCreate(sparkConf)
    174                     # This SparkContext may be an existing one.
    175                     for key, value in self._options.items():

~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in getOrCreate(cls, conf)
    365         with SparkContext._lock:
    366             if SparkContext._active_spark_context is None:
--> 367                 SparkContext(conf=conf or SparkConf())
    368             return SparkContext._active_spark_context
    369 

~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    134         try:
    135             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 136                           conf, jsc, profiler_cls)
    137         except:
    138             # If an error occurs, clean up in order to allow future SparkContext creation:

~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
    196 
    197         # Create the Java SparkContext through Py4J
--> 198         self._jsc = jsc or self._initialize_context(self._conf._jconf)
    199         # Reset the SparkConf to the one actually used by the SparkContext in JVM.
    200         self._conf = SparkConf(_jconf=self._jsc.sc().conf())

~/anaconda3/lib/python3.7/site-packages/pyspark/context.py in _initialize_context(self, jconf)
    304         Initialize SparkContext in function to allow subclass specific initialization
    305         """
--> 306         return self._jvm.JavaSparkContext(jconf)
    307 
    308     @classmethod

~/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1523         answer = self._gateway_client.send_command(command)
   1524         return_value = get_return_value(
-> 1525             answer, self._gateway_client, None, self._fqn)
   1526 
   1527         for temp_arg in temp_args:

~/anaconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
    at java.base/sun.nio.ch.Net.bind0(Native Method)
    at java.base/sun.nio.ch.Net.bind(Net.java:461)
    at java.base/sun.nio.ch.Net.bind(Net.java:453)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:227)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.base/java.lang.Thread.run(Thread.java:834)
izj3ouym

izj3ouym1#

看起来你运行的SparkSessions太多了。在默认配置中你只能有16个,因为要为Spark的作业概述页面获取一个端口需要16次重试。
这可能是因为您在一个繁忙的集群上工作,有许多用户在运行作业,或者,例如,因为您有许多运行SparkSessions的Jupyter笔记本。
根据您使用的资源管理器,有不同的方法来检查当前打开了多少SparkSessions。
为了避免这个问题,你也可以增加Spark在创建SparkSession时创建的未使用端口的重试次数。为此,你必须将配置参数spark.port.maxRetries设置为一个更大的值(另请参阅此处:(https://spark.apache.org/docs/latest/configuration.html):

spark = SparkSession.builder.config('spark.port.maxRetries', 100).getOrCreate()

相关问题