dask迭代比前一种算法长

kuhbmx9i  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(555)

我目前正在dask和spark(form comparison purposes)中实现dijkstra算法的一个变体,用于航班起飞,这涉及到对图的节点进行顺序计算。另外,在每一步中,我都会过滤掉图(节点)中的一些记录,因为它们由于离开时间而变得不可行。然而,尽管Dataframe变小了,但是新的迭代要比以前的迭代花费更长的时间。我在spark中通过将中间结果写入parquet解决了这个问题,但在dask中无法解决它。
我怀疑dataframe在图中的每一步都会再次执行,但是我无法阻止这种情况的发生。
到目前为止,我已经尝试了以下方法:
使用 persist (这是最快的)。但是,每次迭代都会增加ui中要完成的任务数。例如:迭代8显示x/800,迭代9显示x/900(我使用100个分区)。

  1. while i < n_nodes:
  2. i += 1
  3. # some computations over df
  4. df = client.persist(df)
  5. # add new records from df to explored
  6. # some computations over explored
  7. explored = client.persist(explored)

将当前df写入磁盘,然后立即读取(这在spark中工作得非常好,但在dask中效果不太好,因为它会附加数据,如果删除目录,则会失败)。在这种情况下,我使用了两者(不是同时使用), del df 以及 client.cancel(df) 对计算时间几乎没有影响,所以我决定把它们注解掉。

  1. while i < n_nodes:
  2. i += 1
  3. # some computations over df
  4. os.system('rm -r temp_dir/df_vuelos_dask')
  5. df.to_parquet('temp_dir/df_vuelos_dask')
  6. # del df
  7. # client.cancel(df)
  8. df = dd.read_parquet('temp_dir/df_vuelos_dask')
  9. # add new records from df to explored
  10. # some computations over explored
  11. os.system('rm -r temp_dir/explored')
  12. explored.to_parquet('temp_dir/explored')
  13. # del explored
  14. # client.cancel(explored)
  15. dd.read_parquet('temp_dir/explored')

使用 client.restart() . 这一个不好,因为它删除了df的内容,并探讨了哪些问题。

  1. while i < n_nodes:
  2. i += 1
  3. # some computations over df
  4. os.system('rm -r temp_dir/df_vuelos_dask')
  5. df.to_parquet('temp_dir/df_vuelos_dask')
  6. client.restart()
  7. df = dd.read_parquet('temp_dir/df_vuelos_dask')
  8. # add new records from df to explored
  9. # some computations over explored
  10. os.system('rm -r temp_dir/explored')
  11. explored.to_parquet('temp_dir/explored')
  12. client.restart()
  13. dd.read_parquet('temp_dir/explored')

以下是控制台中打印的已用时间(秒)的输出:

  1. Iteration 2 / 280. Elapsed time: 407.85055565834045
  2. Iteration 3 / 280. Elapsed time: 434.58717703819275
  3. Iteration 4 / 280. Elapsed time: 436.2463436126709
  4. Iteration 5 / 280. Elapsed time: 437.9837713241577
  5. Iteration 6 / 280. Elapsed time: 440.2417469024658
  6. Iteration 7 / 280. Elapsed time: 442.7933940887451
  7. Iteration 8 / 280. Elapsed time: 445.7904782295227
  8. Iteration 9 / 280. Elapsed time: 449.1104226112366
  9. Iteration 10 / 280. Elapsed time: 452.3273584842682
  10. Iteration 11 / 280. Elapsed time: 456.3567247390747
  11. Iteration 12 / 280. Elapsed time: 460.65562629699707
  12. Iteration 13 / 280. Elapsed time: 464.7628743648529
  13. Iteration 14 / 280. Elapsed time: 469.59177350997925
  14. Iteration 15 / 280. Elapsed time: 474.6557366847992
  15. Iteration 16 / 280. Elapsed time: 479.7272925376892
  16. Iteration 17 / 280. Elapsed time: 485.53346991539
  17. Iteration 18 / 280. Elapsed time: 491.11691975593567
  18. Iteration 19 / 280. Elapsed time: 497.39954662323
  19. Iteration 20 / 280. Elapsed time: 504.03624844551086
  20. Iteration 21 / 280. Elapsed time: 510.45858550071716
  21. Iteration 22 / 280. Elapsed time: 517.7796952724457
  22. Iteration 23 / 280. Elapsed time: 525.3149480819702
  23. Iteration 24 / 280. Elapsed time: 532.6355893611908
  24. Iteration 25 / 280. Elapsed time: 541.2597570419312
  25. Iteration 26 / 280. Elapsed time: 549.2841284275055
  26. Iteration 27 / 280. Elapsed time: 558.8050730228424
  27. Iteration 28 / 280. Elapsed time: 567.617687702179
  28. Iteration 29 / 280. Elapsed time: 577.8864963054657
  29. Iteration 30 / 280. Elapsed time: 587.5171909332275
  30. Iteration 31 / 280. Elapsed time: 598.4596126079559
  31. Iteration 32 / 280. Elapsed time: 608.7272901535034
  32. Iteration 33 / 280. Elapsed time: 620.6863214969635
  33. Iteration 34 / 280. Elapsed time: 631.9231634140015
  34. Iteration 35 / 280. Elapsed time: 643.090336561203
  35. Iteration 36 / 280. Elapsed time: 656.1529128551483
  36. Iteration 37 / 280. Elapsed time: 667.9437139034271
  37. Iteration 38 / 280. Elapsed time: 681.2613704204559
  38. Iteration 39 / 280. Elapsed time: 695.7434968948364
  39. Iteration 40 / 280. Elapsed time: 709.1406977176666
  40. Iteration 41 / 280. Elapsed time: 723.0397245883942
  41. Iteration 42 / 280. Elapsed time: 737.5559349060059
  42. Iteration 43 / 280. Elapsed time: 753.8705065250397
  43. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  44. Iteration 44 / 280. Elapsed time: 768.2957532405853
  45. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  46. Iteration 45 / 280. Elapsed time: 783.177583694458
  47. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  48. Iteration 46 / 280. Elapsed time: 798.720709323883
  49. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  50. Iteration 47 / 280. Elapsed time: 814.6071207523346
  51. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  52. Iteration 48 / 280. Elapsed time: 830.2278523445129
  53. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  54. Iteration 49 / 280. Elapsed time: 846.3982262611389
  55. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  56. Iteration 50 / 280. Elapsed time: 865.5728619098663
  57. Iteration 51 / 280. Elapsed time: 882.612627029419
  58. distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
  59. Iteration 52 / 280. Elapsed time: 900.9131906032562
  60. distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
  61. Iteration 53 / 280. Elapsed time: 919.1079332828522
  62. distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
  63. Iteration 54 / 280. Elapsed time: 937.6077470779419
  64. distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
  65. Iteration 55 / 280. Elapsed time: 957.1775703430176

我在一台有16gb ram和12核的笔记本电脑上本地执行它。数据集大约为7gb,存储为parquet。
我将感谢你的指导,我做错了什么或一种方法,放弃已完成的图形行动。
谢谢!

hyrbngr7

hyrbngr71#

您的第一个解决方案(使用 persist ),似乎是合理的。ui中的任务数是累积的(因此不应该每次都从头开始计算,如果有100个分区,它们将以100的倍数递增)。
下面是我正在使用的一个示例:

  1. import dask.dataframe as dd
  2. from dask.distributed import Client
  3. import pandas as pd
  4. import numpy as np
  5. import time
  6. client = Client()
  7. client
  8. max_number_of_nodes = 35
  9. number_of_ties = 1_000
  10. network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
  11. ddf = dd.from_pandas(network, npartitions=10)
  12. for i in range(10):
  13. ddf = ddf[ddf['source']//i!=5]
  14. ddf = client.persist(ddf)
  15. time.sleep(1)
展开查看全部

相关问题