如何解决运行pyspark代码的Dataproc集群中的OutOfMemoryError?

rryofs0p  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(134)

我正在编写一个pyspark代码,在这里我连接到一个BigQuery表,并将该源表导入为一个df。这个过程需要重命名df列名。为此,我定义了一个字典,基本上是硬编码。

  1. cols_new_to_original = {'colA_new':'colA_original', 'colB_new':'colB_Original'...}

字符串
这有大约3000+键:值对,进一步我使用以下步骤使用cols_new_to_original重命名df的列。
代码:

  1. # Replace column names using the cols_new_to_original
  2. df = df.repartition(30)
  3. for new_name, original_name in cols_new_to_original.items():
  4. df = df.withColumnRenamed(new_name, original_name)


在这样做的时候,我得到了以下错误:

  1. Traceback (most recent call last):
  2. File "/tmp/5642d3d6-77f7-4615-aae9-dcd4e1c9bbdb/scorer.py", line 132, in <module>
  3. score()
  4. File "/tmp/5642d3d6-77f7-4615-aae9-dcd4e1c9bbdb/scorer.py", line 90, in score
  5. df = df.withColumnRenamed(new_name, original_name)
  6. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 2475, in withColumnRenamed
  7. File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  8. File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  9. File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
  10. py4j.protocol.Py4JJavaError: An error occurred while calling o118.withColumnRenamed.
  11. : java.lang.OutOfMemoryError: GC overhead limit exceeded
  12. at java.lang.reflect.Array.newInstance(Array.java:75)


下面是我的集群配置:

  1. cluster_config = {
  2. "master_config": {
  3. "num_instances": 1,
  4. "machine_type_uri": "n2-standard-2",
  5. "disk_config": {
  6. "boot_disk_size_gb": 500
  7. }
  8. },
  9. "worker_config": {
  10. "num_instances": 8,
  11. "machine_type_uri": "n2-standard-8",
  12. "disk_config": {
  13. "boot_disk_size_gb": 1000
  14. }
  15. },
  16. "secondary_worker_config": {
  17. "num_instances": 1,
  18. "machine_type_uri": "n2-standard-8",
  19. "disk_config": {
  20. "boot_disk_size_gb": 1000
  21. },
  22. "preemptibility": "NON_PREEMPTIBLE"
  23. },
  24. "software_config": {
  25. "image_version": "2.0.27-centos8",
  26. "optional_components": [
  27. "JUPYTER"
  28. ],
  29. "properties": {
  30. "spark:spark.dynamicAllocation.enabled": "true",
  31. "spark:spark.dynamicAllocation.minExecutors": "1",
  32. "spark:spark.dynamicAllocation.maxExecutors": "10",
  33. "spark:spark.shuffle.service.enabled": "true"
  34. }
  35. }, .............................


最初我也尝试了"spark:spark.executor.cores": "2""spark:spark.executor.memory": "16g",但我得到了同样的问题。

hts6caw3

hts6caw31#

感谢@大港的建议。OOM是由驱动程序而不是执行者来执行的,调整spark.driver.memory很有帮助。
我还必须对如何重命名列进行一些更改。
新代码:

  1. from pyspark.sql.functions import col
  2. [col(c).alias(cols_new_to_original.get(c, c)) for c in df.columns]

字符串
这招奏效了。

相关问题