python 使用multiprocessing将阵列拆分为多个部分时显示总进度条

v09wglhw  于 2023-05-21  发布在  Python
关注(0)|答案(2)|浏览(113)

我有一个代码,我正在计算一个数组whislt显示进度条的平均值。
下面是我的代码:

import numpy as np
from tqdm import tqdm
import multiprocessing as mtp

def split_list(some_list, size):
  for i in range(0, len(some_list), size):
      yield some_list[i:i + size]  

def foo(x):
    return np.square(x)

def main():
    a = np.arange(1000)
    av = 0
    with mtp.Pool(4) as pool:
        for l in split_list(a, 33):
            output = list(tqdm(pool.imap(foo, l), total = len(a)))
            av += np.sum(output)
        av /= len(a)
        
    return av

这显示了几个进度条。我只想有一个与整体的进展。
我该怎么做?
编辑:
我有一个可以用sudo的东西

def main():
    a = [np.random.rand(10, 3, 3) for _ in range(1000)]
    av = 0
    
    with mtp.Pool(4) as pool:
      with tqdm.tqdm(total = len(a)) as p_bar:
        for l in split_list(a, 33):
            output = list(pool.imap(foo, l))
            av += np.sum(output)
            p_bar.update(len(l))

    av /= len(a)
    return av

但它只会在大块更新,而不是因为结果是可用的。

um6iljoc

um6iljoc1#

您的split_list函数正在创建一些长度为33的子列表(可能最后一个子列表除外)。因此,当每个子列表被处理时,你需要将进度条前进刚刚处理的子列表的长度,并且你需要将 * 所有 * 子列表作为 * 一个 * 对imap_unordered的调用来处理(这比imap更快,并且对于计算平方的平均值,你不关心结果返回的顺序):

import numpy as np
from tqdm import tqdm
import multiprocessing as mtp

def split_list(some_list, size):
  for i in range(0, len(some_list), size):
      yield some_list[i:i + size]

def foo(x):
    return np.square(x)

def main():
    a = np.arange(1000)
    the_sum = 0
    with mtp.Pool(4) as pool, tqdm(total=len(a)) as pbar:
        for result in pool.imap_unordered(foo, split_list(a, 33)):
            the_sum += np.sum(result)
            pbar.update(len(result))
    av = the_sum / len(a)

    return av

if __name__ == '__main__':
    print(main())

图纸:

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:00<00:00, 3546.10it/s]
332833.5

更新

只要您自己创建块,并使用np.square返回块中每个元素的方块,我不知道如何避免以块的形式更新进度条。如果你想让一个大小为1000的进度条一次更新一个进度条,你必须执行1000个操作:

import numpy as np
from tqdm import tqdm
import multiprocessing as mtp

def foo(x):
    return x ** 2

def main():
    a = np.arange(1000)
    the_sum = 0
    with mtp.Pool(4) as pool, tqdm(total=len(a)) as pbar:
        for result in pool.imap_unordered(foo, a):
            the_sum += result
            pbar.update()
    av = the_sum / len(a)

    return av

if __name__ == '__main__':
    print(main())

备注

正如我在评论中指出的,我看到的增加大小为N的进度条的唯一方法是执行N个操作,并在每个操作完成时更新进度条。有几种方法可以做到这一点(我在上面展示了一种),但本质上它们都需要替换np.square,这是用一个显式循环来一次平方33个整数,该循环分别对33个整数中的每个整数进行平方。
如果您想要另一种方法,允许您向foo传递大小为33的子列表,并且每次将子列表的元素平方时仍然更新进度条,那么下面的代码将是一种方法。它包括向主进程返回一个“事件”,表示平方操作已经完成。下面我们使用multiprocessing.Queuefoo在完成平方操作时将一些任意项放入队列,主进程从队列中获取这些“事件”并更新进度条。
在这里,我们更新a的大小,使其具有1,000,000个元素:

import numpy as np
from tqdm import tqdm
import multiprocessing as mtp

def split_list(some_list, size):
  for i in range(0, len(some_list), size):
      yield some_list[i:i + size]

def pool_initializer(the_queue):
    global q

    q = the_queue

def foo(x):
    partial_sum = 0
    for elem in x:
        # convert np.int64 to int and then square it
        partial_sum += int(elem) ** 2
        q.put(None) # Value is not important
    return partial_sum

def main():
    a = np.arange(1_000_000, dtype=np.int64)
    the_sum = 0

    q = mtp.Queue()
    with mtp.Pool(4, initializer=pool_initializer, initargs=(q,)) as pool, \
    tqdm(total=len(a)) as pbar:
        it = pool.imap_unordered(foo, split_list(a, 33))
        for _ in range(len(a)):
            q.get()
            pbar.update()
        the_sum = sum(it)

    av = the_sum / len(a)
    return av

if __name__ == '__main__':
    print(main())

图纸:

100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 1000000/1000000 [00:32<00:00, 31107.52it/s]
333332833333.5

与我建议的方法(经过适当的修改)相比,我建议的方法是按块报告:

import numpy as np
from tqdm import tqdm
import multiprocessing as mtp

def split_list(some_list, size):
  for i in range(0, len(some_list), size):
      yield some_list[i:i + size]

def foo(x):
    return np.square(x, dtype=np.int64)

def main():
    a = np.arange(1_000_000, dtype=np.int64)
    the_sum = 0
    with mtp.Pool(4) as pool, tqdm(total=len(a)) as pbar:
        for result in pool.imap_unordered(foo, split_list(a, 33)):
            the_sum += int(np.sum(result, dtype=np.int64))
            pbar.update(len(result))
    av = the_sum / len(a)

    return av

if __name__ == '__main__':
    print(main())

图纸:

100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 1000000/1000000 [00:09<00:00, 109517.22it/s]
333332833333.5

**运行分别为32秒和9秒。报告块的速度几乎快了4倍。使用的块越大,性能就越好。如果我将数组大小增加到50,000,000(从1,000,000),但将块大小增加到10,000(从33),那么运行时间实际上从9秒减少到2秒(tqdm只是四舍五入到最近的秒的报告时间)。

kuhbmx9i

kuhbmx9i2#

我认为它只是放置“tqdm”类的地方。每次循环“for l in split_list(a,33)”输入新值时,都会生成一个新的进度条。
如果您使用的“multiprocessing”模块函数通过“print”或“LOG”打印消息,则此消息将中断进度条,并在消息下生成一个新的进度条,其中包含先前进度条的累积进度。
如果你只需要一个进度条,你应该把它放在“for”循环语句中,并检查打印的消息。
重要提示:我从来没有使用过多处理,我只是在谈论“tqdm”模块。

相关问题