是什么导致了PySpark中的内存问题?

fiei3ece  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(152)

我是PySpark的新手,我正在尝试编写一个随机森林回归模型,以根据各种网络指标预测流会话期间发生的缓冲量。该模型将在存储在Hadoop分布式文件系统中的数据集文件上运行。该文件大小为9 MB,用于上下文。
我一直在Oracle VirtualBox上使用两个Ubuntu虚拟机,一个主虚拟机和一个从虚拟机。我遇到了内存问题,我有点困惑为什么会发生这些问题。
代码如下:

#Importing libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import mean, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import numpy as np

#Defining fields
Fields = [StructField("NbClients", IntegerType(), True),
    StructField("BottleneckBW", IntegerType(), True),
    StructField("BottleneckDelay", FloatType(), True),
    StructField("ClientResolution", IntegerType(), True),
    StructField("RequestDuration", IntegerType(), True),
    StructField("TCPOutputPacket", FloatType(), True),
    StructField("TCPOutputDelay", FloatType(), True),
    StructField("TCPOutputJitter", FloatType(), True),
    StructField("TCPOutputPloss", IntegerType(), True),
    StructField("TCPInputPacket", FloatType(), True),
    StructField("TCPInputDelay", FloatType(), True),
    StructField("TCPInputJitter", FloatType(), True),
    StructField("TCPInputPloss", IntegerType(), True),
    StructField("TCPInputRetrans", IntegerType(), True),
    StructField("50_InputRateVariation", IntegerType(), True),
    StructField("StdInterATimesReq", FloatType(), True),
    StructField("50_InterATimesReq", FloatType(), True),
    StructField("AvgBufferLevel", FloatType(), True)
]

#Defining schema
Schema = StructType(fields=Fields)

#Starting Spark session
Spark = SparkSession.builder.master("spark://IPADDRESS:7077").getOrCreate()

#Reading CSV from file system
SparkDataFrame = Spark.read.csv("hdfs://IPADDRESS:9000/Dataset.csv", 
sep=",", header=True, schema=Schema)

#Assembling all the predictive features into one features column that is used by the random forest regressor
Assembler = VectorAssembler(inputCols=[
"NbClients",
"BottleneckBW",
"BottleneckDelay",
"ClientResolution",
"RequestDuration",
"TCPOutputPacket",
"TCPOutputDelay",
"TCPOutputJitter",
"TCPOutputPloss",
"TCPInputPacket",
"TCPInputDelay",
"TCPInputJitter",
"TCPInputPloss",
"TCPInputRetrans",
"50_InputRateVariation",
"StdInterATimesReq",
"50_InterATimesReq"], outputCol="QoSAndContextFeatures",
handleInvalid="keep")

#Creating random forest regressor, with AvgBufferLevel as the label column and the newly created vector QoSAndContextFeatures as the predictive column
RandomForestRegressionModel = RandomForestRegressor(labelCol="AvgBufferLevel", featuresCol="QoSAndContextFeatures")

#Creating a two-stage prediction pipeline that ensures that the data is always passed through the vector assembler prior to the random forest model being created
PredictionPipeline = Pipeline(stages=[Assembler, RandomForestRegressionModel])

#Creating a paramater grid to test for the most optimal hyperparameters of the random forest model
ParameterGrid = ParamGridBuilder() \
            .addGrid(RandomForestRegressionModel.numTrees, [int(x) for x in np.linspace(start=10, stop=100, num=5)]) \
            .addGrid(RandomForestRegressionModel.maxDepth, [int(x) for x in np.linspace(start=5, stop=50, num=5)]) \
            .build()

#Building a cross validator of the random forest model that encapsulates the prediction pipeline and the parameter grid alongside an evaluator
CrossValidation = CrossValidator(estimator=PredictionPipeline,
                             estimatorParamMaps=ParameterGrid,
                             evaluator=RegressionEvaluator(labelCol="AvgBufferLevel"),
                             numFolds=3)

#Splitting the data into training and testing data
(TrainingData, TestingData) = SparkDataFrame.randomSplit([0.8,0.2])

#Fitting the cross validated model to the training data
CrossValidatedModel = CrossValidation.fit(TrainingData)

#Making predictions on the testing data
Predictions = CrossValidatedModel.transform(TestingData)

#Declaring evaluator of RSquared to evaluate the RSquared of the model
RSquaredEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="r2")

#Declaring evaluator of RMSE to evaluate the RMSE of the model
RMSEEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="rmse")

#Calculating the RSquared of the model
RSquared = RSquaredEvaluator.evaluate(Predictions)

#Calculating the RMSE of the model
RMSE = RMSEEvaluator.evaluate(Predictions)

#Putting the mean observation of AvgBufferLevel from the test dataset into an array
MeanObservationArray = TestingData.select(mean(col("AvgBufferLevel"))).collect()

#Collecting the mean observation
MeanObservation = MeanObservationArray[0]

#Calculating the scatter index, which represents RMSE as a proportion of the mean observation
ScatterIndex = RMSE/MeanObservation

#Printing RSquared
print("R Squared:",RSquared)

#Printing RMSE
print("RMSE:",RMSE)

#Printing scatter index
print("Scatter Index:",ScatterIndex)

字符串
我首先尝试在一个总共有8 GB RAM的系统上运行模型,每个VM分配了2GB的RAM。然而,当我尝试在主VM上运行Spark中的代码时,我在执行的中途收到了来自Linux的消息:

Killed


我查看了Spark master网站,看看输出了什么错误,错误是沿着行的东西:

ERROR CoarseGrainedBackEndExecutor: Driver disassociated.


当我搜索这个错误时,我确定它与没有足够的RAM有关,所以我将每个VM的RAM增加到3GB并再次尝试代码。然而,当我尝试这个时,由于我的系统没有足够的RAM来促进两个Ubuntu VM的运行,从VM在执行中途被VirtualBox中止。
因此,我试着简化了随机森林的代码,删除了两阶段流水线,并在执行前设置了超参数,而不是搜索它们。我想知道这些是否会增加我代码的RAM强度,所以我希望这可以解决这个问题。简化的代码如下:

#Importing libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import mean, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import numpy as np

#Defining fields
Fields = [StructField("NbClients", IntegerType(), True),
    StructField("BottleneckBW", IntegerType(), True),
    StructField("BottleneckDelay", FloatType(), True),
    StructField("ClientResolution", IntegerType(), True),
    StructField("RequestDuration", IntegerType(), True),
    StructField("TCPOutputPacket", FloatType(), True),
    StructField("TCPOutputDelay", FloatType(), True),
    StructField("TCPOutputJitter", FloatType(), True),
    StructField("TCPOutputPloss", IntegerType(), True),
    StructField("TCPInputPacket", FloatType(), True),
    StructField("TCPInputDelay", FloatType(), True),
    StructField("TCPInputJitter", FloatType(), True),
    StructField("TCPInputPloss", IntegerType(), True),
    StructField("TCPInputRetrans", IntegerType(), True),
    StructField("50_InputRateVariation", IntegerType(), True),
    StructField("StdInterATimesReq", FloatType(), True),
    StructField("50_InterATimesReq", FloatType(), True),
    StructField("AvgBufferLevel", FloatType(), True)
]

#Defining schema
Schema = StructType(fields=Fields)

#Starting Spark session
Spark = SparkSession.builder.master("spark://192.168.56.112:7077").getOrCreate()

#Reading CSV from file system
SparkDataFrame = Spark.read.csv("hdfs://192.168.56.112:9000/CleanedAssignmentDataset.csv", 
sep=",", header=True, schema=Schema)

#Assembling all the predictive features into one features column that is used by the random forest regressor
Assembler = VectorAssembler(inputCols=[
"NbClients",
"BottleneckBW",
"BottleneckDelay",
"ClientResolution",
"RequestDuration",
"TCPOutputPacket",
"TCPOutputDelay",
"TCPOutputJitter",
"TCPOutputPloss",
"TCPInputPacket",
"TCPInputDelay",
"TCPInputJitter",
"TCPInputPloss",
"TCPInputRetrans",
"50_InputRateVariation",
"StdInterATimesReq",
"50_InterATimesReq"], outputCol="QoSAndContextFeatures",
handleInvalid="keep")

#Transforming DataFrame into vector with combined features column
AssembledData = Assembler.transform(SparkDataFrame)

#Creating random forest regressor, with AvgBufferLevel as the label column and the newly created vector QoSAndContextFeatures as the predictive column
RandomForestRegressionModel = RandomForestRegressor(labelCol="AvgBufferLevel", featuresCol="QoSAndContextFeatures", numTrees=100, maxDepth=30)

#Splitting the data into training and testing data
(TrainingData, TestingData) = AssembledData.randomSplit([0.8,0.2])

#Fitting the random forest to the training data
RandomForestFitted = RandomForestRegressionModel.fit(TrainingData)

#Making predictions on the testing data
Predictions = RandomForestFitted.transform(TestingData)

#Declaring evaluator of RSquared to evaluate the RSquared of the model
RSquaredEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="r2")

#Declaring evaluator of RMSE to evaluate the RMSE of the model
RMSEEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="rmse")

#Calculating the RSquared of the model
RSquared = RSquaredEvaluator.evaluate(Predictions)

#Calculating the RMSE of the model
RMSE = RMSEEvaluator.evaluate(Predictions)

#Putting the mean observation of AvgBufferLevel from the test dataset into an array
MeanObservationArray = TestingData.select(mean(col("AvgBufferLevel"))).collect()

#Collecting the mean observation
MeanObservation = MeanObservationArray[0]

#Calculating the scatter index, which represents RMSE as a proportion of the mean observation
ScatterIndex = RMSE/MeanObservation

#Printing RSquared
print("R Squared:",RSquared)

#Printing RMSE
print("RMSE:",RMSE)

#Printing scatter index
print("Scatter Index:",ScatterIndex)


然而,这段代码仍然返回相同的问题;当我分配2GB时,它仍然被Linux杀死,当我分配3GB时,从VM仍然在执行中途死亡。
因此,我决定在一个有16 GB RAM的系统上尝试代码。我重新配置了这个系统上的VM,每个VM有4GB的RAM,但是当我试图运行简化的代码时,Linux在执行中途返回“Killed”。
因此,我决定尝试将每个VM上的RAM增加到5GB并运行简化的代码。然而,我收到了以下错误:

java.lang.OutOfMemoryError: Java heap space


我做了一些研究,我检查了spark-default.conf文件。我的Spark内存默认设置为5GB。
我必须承认,我完全被导致这个错误的原因难住了。我的数据集CSV文件只有9 MB大小。此外,我已经设法用常规Python编写了一个随机森林回归器,使用Scikit Learn库,即使在低功耗的8 GB系统上也能运行良好,即使有参数网格搜索和所有。它花了一两分钟运行,但它运行良好。
有没有人知道是什么导致了我的记忆问题,因为我承认我非常卡住了?
任何建议都非常感谢。

c3frrgcw

c3frrgcw1#

我想我现在已经找到了这个问题的根源,并让我的代码工作。
经过进一步调查,我认为问题是由于我对随机森林回归器的超参数有点过于雄心勃勃,当我尝试使用较低的超参数时,代码运行得很好,“杀戮”停止了。
非常感谢大家的帮助!

相关问题