是什么导致了PySpark中的内存问题?

fiei3ece  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(174)

我是PySpark的新手,我正在尝试编写一个随机森林回归模型,以根据各种网络指标预测流会话期间发生的缓冲量。该模型将在存储在Hadoop分布式文件系统中的数据集文件上运行。该文件大小为9 MB,用于上下文。
我一直在Oracle VirtualBox上使用两个Ubuntu虚拟机,一个主虚拟机和一个从虚拟机。我遇到了内存问题,我有点困惑为什么会发生这些问题。
代码如下:

  1. #Importing libraries
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
  4. from pyspark.sql.functions import mean, col
  5. from pyspark.ml.feature import VectorAssembler
  6. from pyspark.ml.regression import RandomForestRegressor
  7. from pyspark.ml import Pipeline
  8. from pyspark.ml.tuning import ParamGridBuilder
  9. from pyspark.ml.tuning import CrossValidator
  10. from pyspark.ml.evaluation import RegressionEvaluator
  11. import pandas as pd
  12. import numpy as np
  13. #Defining fields
  14. Fields = [StructField("NbClients", IntegerType(), True),
  15. StructField("BottleneckBW", IntegerType(), True),
  16. StructField("BottleneckDelay", FloatType(), True),
  17. StructField("ClientResolution", IntegerType(), True),
  18. StructField("RequestDuration", IntegerType(), True),
  19. StructField("TCPOutputPacket", FloatType(), True),
  20. StructField("TCPOutputDelay", FloatType(), True),
  21. StructField("TCPOutputJitter", FloatType(), True),
  22. StructField("TCPOutputPloss", IntegerType(), True),
  23. StructField("TCPInputPacket", FloatType(), True),
  24. StructField("TCPInputDelay", FloatType(), True),
  25. StructField("TCPInputJitter", FloatType(), True),
  26. StructField("TCPInputPloss", IntegerType(), True),
  27. StructField("TCPInputRetrans", IntegerType(), True),
  28. StructField("50_InputRateVariation", IntegerType(), True),
  29. StructField("StdInterATimesReq", FloatType(), True),
  30. StructField("50_InterATimesReq", FloatType(), True),
  31. StructField("AvgBufferLevel", FloatType(), True)
  32. ]
  33. #Defining schema
  34. Schema = StructType(fields=Fields)
  35. #Starting Spark session
  36. Spark = SparkSession.builder.master("spark://IPADDRESS:7077").getOrCreate()
  37. #Reading CSV from file system
  38. SparkDataFrame = Spark.read.csv("hdfs://IPADDRESS:9000/Dataset.csv",
  39. sep=",", header=True, schema=Schema)
  40. #Assembling all the predictive features into one features column that is used by the random forest regressor
  41. Assembler = VectorAssembler(inputCols=[
  42. "NbClients",
  43. "BottleneckBW",
  44. "BottleneckDelay",
  45. "ClientResolution",
  46. "RequestDuration",
  47. "TCPOutputPacket",
  48. "TCPOutputDelay",
  49. "TCPOutputJitter",
  50. "TCPOutputPloss",
  51. "TCPInputPacket",
  52. "TCPInputDelay",
  53. "TCPInputJitter",
  54. "TCPInputPloss",
  55. "TCPInputRetrans",
  56. "50_InputRateVariation",
  57. "StdInterATimesReq",
  58. "50_InterATimesReq"], outputCol="QoSAndContextFeatures",
  59. handleInvalid="keep")
  60. #Creating random forest regressor, with AvgBufferLevel as the label column and the newly created vector QoSAndContextFeatures as the predictive column
  61. RandomForestRegressionModel = RandomForestRegressor(labelCol="AvgBufferLevel", featuresCol="QoSAndContextFeatures")
  62. #Creating a two-stage prediction pipeline that ensures that the data is always passed through the vector assembler prior to the random forest model being created
  63. PredictionPipeline = Pipeline(stages=[Assembler, RandomForestRegressionModel])
  64. #Creating a paramater grid to test for the most optimal hyperparameters of the random forest model
  65. ParameterGrid = ParamGridBuilder() \
  66. .addGrid(RandomForestRegressionModel.numTrees, [int(x) for x in np.linspace(start=10, stop=100, num=5)]) \
  67. .addGrid(RandomForestRegressionModel.maxDepth, [int(x) for x in np.linspace(start=5, stop=50, num=5)]) \
  68. .build()
  69. #Building a cross validator of the random forest model that encapsulates the prediction pipeline and the parameter grid alongside an evaluator
  70. CrossValidation = CrossValidator(estimator=PredictionPipeline,
  71. estimatorParamMaps=ParameterGrid,
  72. evaluator=RegressionEvaluator(labelCol="AvgBufferLevel"),
  73. numFolds=3)
  74. #Splitting the data into training and testing data
  75. (TrainingData, TestingData) = SparkDataFrame.randomSplit([0.8,0.2])
  76. #Fitting the cross validated model to the training data
  77. CrossValidatedModel = CrossValidation.fit(TrainingData)
  78. #Making predictions on the testing data
  79. Predictions = CrossValidatedModel.transform(TestingData)
  80. #Declaring evaluator of RSquared to evaluate the RSquared of the model
  81. RSquaredEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="r2")
  82. #Declaring evaluator of RMSE to evaluate the RMSE of the model
  83. RMSEEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="rmse")
  84. #Calculating the RSquared of the model
  85. RSquared = RSquaredEvaluator.evaluate(Predictions)
  86. #Calculating the RMSE of the model
  87. RMSE = RMSEEvaluator.evaluate(Predictions)
  88. #Putting the mean observation of AvgBufferLevel from the test dataset into an array
  89. MeanObservationArray = TestingData.select(mean(col("AvgBufferLevel"))).collect()
  90. #Collecting the mean observation
  91. MeanObservation = MeanObservationArray[0]
  92. #Calculating the scatter index, which represents RMSE as a proportion of the mean observation
  93. ScatterIndex = RMSE/MeanObservation
  94. #Printing RSquared
  95. print("R Squared:",RSquared)
  96. #Printing RMSE
  97. print("RMSE:",RMSE)
  98. #Printing scatter index
  99. print("Scatter Index:",ScatterIndex)

字符串
我首先尝试在一个总共有8 GB RAM的系统上运行模型,每个VM分配了2GB的RAM。然而,当我尝试在主VM上运行Spark中的代码时,我在执行的中途收到了来自Linux的消息:

  1. Killed


我查看了Spark master网站,看看输出了什么错误,错误是沿着行的东西:

  1. ERROR CoarseGrainedBackEndExecutor: Driver disassociated.


当我搜索这个错误时,我确定它与没有足够的RAM有关,所以我将每个VM的RAM增加到3GB并再次尝试代码。然而,当我尝试这个时,由于我的系统没有足够的RAM来促进两个Ubuntu VM的运行,从VM在执行中途被VirtualBox中止。
因此,我试着简化了随机森林的代码,删除了两阶段流水线,并在执行前设置了超参数,而不是搜索它们。我想知道这些是否会增加我代码的RAM强度,所以我希望这可以解决这个问题。简化的代码如下:

  1. #Importing libraries
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
  4. from pyspark.sql.functions import mean, col
  5. from pyspark.ml.feature import VectorAssembler
  6. from pyspark.ml.regression import RandomForestRegressor
  7. from pyspark.ml import Pipeline
  8. from pyspark.ml.tuning import ParamGridBuilder
  9. from pyspark.ml.tuning import CrossValidator
  10. from pyspark.ml.evaluation import RegressionEvaluator
  11. import pandas as pd
  12. import numpy as np
  13. #Defining fields
  14. Fields = [StructField("NbClients", IntegerType(), True),
  15. StructField("BottleneckBW", IntegerType(), True),
  16. StructField("BottleneckDelay", FloatType(), True),
  17. StructField("ClientResolution", IntegerType(), True),
  18. StructField("RequestDuration", IntegerType(), True),
  19. StructField("TCPOutputPacket", FloatType(), True),
  20. StructField("TCPOutputDelay", FloatType(), True),
  21. StructField("TCPOutputJitter", FloatType(), True),
  22. StructField("TCPOutputPloss", IntegerType(), True),
  23. StructField("TCPInputPacket", FloatType(), True),
  24. StructField("TCPInputDelay", FloatType(), True),
  25. StructField("TCPInputJitter", FloatType(), True),
  26. StructField("TCPInputPloss", IntegerType(), True),
  27. StructField("TCPInputRetrans", IntegerType(), True),
  28. StructField("50_InputRateVariation", IntegerType(), True),
  29. StructField("StdInterATimesReq", FloatType(), True),
  30. StructField("50_InterATimesReq", FloatType(), True),
  31. StructField("AvgBufferLevel", FloatType(), True)
  32. ]
  33. #Defining schema
  34. Schema = StructType(fields=Fields)
  35. #Starting Spark session
  36. Spark = SparkSession.builder.master("spark://192.168.56.112:7077").getOrCreate()
  37. #Reading CSV from file system
  38. SparkDataFrame = Spark.read.csv("hdfs://192.168.56.112:9000/CleanedAssignmentDataset.csv",
  39. sep=",", header=True, schema=Schema)
  40. #Assembling all the predictive features into one features column that is used by the random forest regressor
  41. Assembler = VectorAssembler(inputCols=[
  42. "NbClients",
  43. "BottleneckBW",
  44. "BottleneckDelay",
  45. "ClientResolution",
  46. "RequestDuration",
  47. "TCPOutputPacket",
  48. "TCPOutputDelay",
  49. "TCPOutputJitter",
  50. "TCPOutputPloss",
  51. "TCPInputPacket",
  52. "TCPInputDelay",
  53. "TCPInputJitter",
  54. "TCPInputPloss",
  55. "TCPInputRetrans",
  56. "50_InputRateVariation",
  57. "StdInterATimesReq",
  58. "50_InterATimesReq"], outputCol="QoSAndContextFeatures",
  59. handleInvalid="keep")
  60. #Transforming DataFrame into vector with combined features column
  61. AssembledData = Assembler.transform(SparkDataFrame)
  62. #Creating random forest regressor, with AvgBufferLevel as the label column and the newly created vector QoSAndContextFeatures as the predictive column
  63. RandomForestRegressionModel = RandomForestRegressor(labelCol="AvgBufferLevel", featuresCol="QoSAndContextFeatures", numTrees=100, maxDepth=30)
  64. #Splitting the data into training and testing data
  65. (TrainingData, TestingData) = AssembledData.randomSplit([0.8,0.2])
  66. #Fitting the random forest to the training data
  67. RandomForestFitted = RandomForestRegressionModel.fit(TrainingData)
  68. #Making predictions on the testing data
  69. Predictions = RandomForestFitted.transform(TestingData)
  70. #Declaring evaluator of RSquared to evaluate the RSquared of the model
  71. RSquaredEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="r2")
  72. #Declaring evaluator of RMSE to evaluate the RMSE of the model
  73. RMSEEvaluator = RegressionEvaluator(labelCol="AvgBufferLevel", predictionCol="Prediction", metric="rmse")
  74. #Calculating the RSquared of the model
  75. RSquared = RSquaredEvaluator.evaluate(Predictions)
  76. #Calculating the RMSE of the model
  77. RMSE = RMSEEvaluator.evaluate(Predictions)
  78. #Putting the mean observation of AvgBufferLevel from the test dataset into an array
  79. MeanObservationArray = TestingData.select(mean(col("AvgBufferLevel"))).collect()
  80. #Collecting the mean observation
  81. MeanObservation = MeanObservationArray[0]
  82. #Calculating the scatter index, which represents RMSE as a proportion of the mean observation
  83. ScatterIndex = RMSE/MeanObservation
  84. #Printing RSquared
  85. print("R Squared:",RSquared)
  86. #Printing RMSE
  87. print("RMSE:",RMSE)
  88. #Printing scatter index
  89. print("Scatter Index:",ScatterIndex)


然而,这段代码仍然返回相同的问题;当我分配2GB时,它仍然被Linux杀死,当我分配3GB时,从VM仍然在执行中途死亡。
因此,我决定在一个有16 GB RAM的系统上尝试代码。我重新配置了这个系统上的VM,每个VM有4GB的RAM,但是当我试图运行简化的代码时,Linux在执行中途返回“Killed”。
因此,我决定尝试将每个VM上的RAM增加到5GB并运行简化的代码。然而,我收到了以下错误:

  1. java.lang.OutOfMemoryError: Java heap space


我做了一些研究,我检查了spark-default.conf文件。我的Spark内存默认设置为5GB。
我必须承认,我完全被导致这个错误的原因难住了。我的数据集CSV文件只有9 MB大小。此外,我已经设法用常规Python编写了一个随机森林回归器,使用Scikit Learn库,即使在低功耗的8 GB系统上也能运行良好,即使有参数网格搜索和所有。它花了一两分钟运行,但它运行良好。
有没有人知道是什么导致了我的记忆问题,因为我承认我非常卡住了?
任何建议都非常感谢。

c3frrgcw

c3frrgcw1#

我想我现在已经找到了这个问题的根源,并让我的代码工作。
经过进一步调查,我认为问题是由于我对随机森林回归器的超参数有点过于雄心勃勃,当我尝试使用较低的超参数时,代码运行得很好,“杀戮”停止了。
非常感谢大家的帮助!

相关问题