python os.sched_getaffinity(0)与os.cpu_count()

kpbwa7wx  于 2023-08-02  发布在  Python
关注(0)|答案(3)|浏览(113)

所以,我知道标题中两种方法的区别,但不知道实际含义。
据我所知:如果您使用的NUM_WORKERS比实际可用的内核多,您将面临巨大的性能下降,因为您的操作系统不断地来回切换,试图保持并行。我不知道这有多真实,但我在某个地方从一个比我聪明的人那里读到的。
os.cpu_count()的文档中,它说:

  • 返回系统中的CPU数量。如果未确定,则返回None。此数量不等于当前进程可以使用的CPU数量。可用CPU的数量可以通过len(os.sched_getaffinity(0))* 获得

所以,我试图弄清楚“系统”指的是什么,如果一个进程可以使用比“系统”中更多的CPU。
我只想安全高效地实现multiprocessing.pool功能。我的问题总结如下:

有哪些实际意义:

NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1

字符串
使用-1是因为我发现,如果我在处理数据时尝试工作,系统的滞后性会大大降低。

jaxagkaj

jaxagkaj1#

这两个函数是非常不同的,NUM_WORKERS = os.sched_getaffinity(0) - 1会立即失败,因为你试图从一个集合中减去一个整数。虽然os.cpu_count()告诉你系统有多少个核心,但os.sched_getaffinity(pid)告诉你某个线程/进程可以在哪些核心上运行。

os.cpu_count()

os.cpu_count()显示操作系统已知的可用内核数量(* 虚拟 * 内核)。您很可能拥有此数量的一半 * 物理 * 内核。如果使用比物理核更多的进程,甚至比虚拟核更多的进程是有意义的,这在很大程度上取决于你正在做什么。计算循环越紧密(指令的多样性越小,缓存未命中越少......),您就越有可能无法从更多使用的内核(通过使用更多的工作进程)中受益,甚至无法体验性能下降。
显然,它还取决于系统正在运行的其他内容,因为系统试图在可用内核上为系统中的每个线程(作为进程的实际执行单元)提供公平的运行时份额。因此,就你应该使用多少工人而言,没有可能的概括。但是,如果您有一个紧密的循环,并且您的系统处于空闲状态,那么优化的一个好的起点是

os.cpu_count() // 2 # same as mp.cpu_count() // 2

字符串
……并从那里增加。
@Frank Yellin已经提到,multiprocessing.Pool使用os.cpu_count()作为默认的工作者数量。

os.sched_getaffinity(pid)

os.sched_getaffinity(pid)
返回PID为pid的进程(如果为零,则返回当前进程)所限制的CPU集。
现在core/cpu/processor/-affinity是关于你的线程(在你的worker-process中)被允许在哪些具体的(虚拟的)核心上运行。你的操作系统给每个内核一个id,从0到(number-of-cores - 1),并且改变关联允许限制(“固定”)允许某个线程在哪个(哪些)实际内核上运行。
至少在Linux上,我发现这意味着如果当前没有可用的内核,子进程的线程将不会运行,即使其他不允许的内核处于空闲状态。所以“亲和力”在这里有点误导。
摆弄关联性的目标是最小化上下文切换和核心迁移导致的缓存无效。你的操作系统通常有更好的洞察力,并且已经尝试用它的调度策略保持缓存“热”,所以除非你知道你在做什么,否则你不能指望从干扰中获得轻松的收益。
默认情况下,亲和性设置为所有内核,对于multiprocessing.Pool,更改它没有太大意义,至少如果您的系统处于空闲状态。
请注意,尽管这里的文档提到了“进程”,但设置亲和力实际上是针对每个线程的。因此,例如,在“子”线程中为“如果为零则为当前进程”设置亲和性不会改变进程内的主线程或其他线程的亲和性。* 但是 *,子线程从主线程继承其亲和性,子进程(通过其主线程)从父进程的主线程继承亲和性。这会影响所有可能的start-method(“spawn”,“fork”,“forkserver”)。下面的示例演示了这一点以及如何使用multiprocessing.Pool修改亲和力。

import multiprocessing as mp
import threading
import os

def _location():
    return f"{mp.current_process().name} {threading.current_thread().name}"

def thread_foo():
    print(f"{_location()}, affinity before change: {os.sched_getaffinity(0)}")
    os.sched_setaffinity(0, {4})
    print(f"{_location()}, affinity after change: {os.sched_getaffinity(0)}")

def foo(_, iterations=200e6):

    print(f"{_location()}, affinity before thread_foo:"
          f" {os.sched_getaffinity(0)}")

    for _ in range(int(iterations)):  # some dummy computation
        pass

    t = threading.Thread(target=thread_foo)
    t.start()
    t.join()

    print(f"{_location()}, affinity before exit is unchanged: "
          f"{os.sched_getaffinity(0)}")

    return _

if __name__ == '__main__':

    mp.set_start_method("spawn")  # alternatives on Unix: "fork", "forkserver"

    # for current process, exclude cores 0,1 from affinity-mask
    print(f"parent affinity before change: {os.sched_getaffinity(0)}")
    excluded_cores = {0, 1}
    os.sched_setaffinity(0, os.sched_getaffinity(0).difference(excluded_cores))
    print(f"parent affinity after change: {os.sched_getaffinity(0)}")

    with mp.Pool(2) as pool:
        pool.map(foo, range(5))


输出量:

parent affinity before change: {0, 1, 2, 3, 4, 5, 6, 7}
parent affinity after change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-1, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-1, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-2, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 MainThread, affinity before thread_foo: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-1 Thread-2, affinity after change: {4}
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity before change: {2, 3, 4, 5, 6, 7}
SpawnPoolWorker-2 Thread-3, affinity after change: {4}
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: {2, 3, 4, 5, 6, 7}

e37o9pze

e37o9pze2#

如果你有一个纯100% CPU绑定的任务,即除了计算什么都不做,那么很明显,如果进程池的大小大于计算机上可用的CPU数量,那么什么也得不到。但是,如果有一个混合的I/O抛出,其中一个进程将放弃CPU等待I/O完成(或者,例如,从网站返回的URL,这需要相对较长的时间)?对我来说,在这个场景中,进程池大小超过os.cpu_count()是否就不能提高吞吐量,这一点并不清楚。

更新

下面是演示这一点的代码。这段代码使用的是进程,最好使用线程。我的桌面上有8个核心。该程序简单地同时检索54个URL(或者在这种情况下并行地)。向程序传递一个参数,即要使用的池的大小。不幸的是,创建额外的进程会产生初始开销,因此如果创建太多进程,节省的开销就会开始减少。但是如果任务长时间运行并且有很多I/O,那么创建进程的开销最终是值得的:

from concurrent.futures import ProcessPoolExecutor, as_completed
import requests
from timing import time_it

def get_url(url):
    resp = requests.get(url, headers={'user-agent': 'my-app/0.0.1'})
    return resp.text

@time_it
def main(poolsize):
    urls = [
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
    ]
    with ProcessPoolExecutor(poolsize) as executor:
        futures = {executor.submit(get_url, url): url for url in urls}
        for future in as_completed(futures):
            text = future.result()
            url = futures[future]
            print(url, text[0:80])
            print('-' * 100)

if __name__ == '__main__':
    import sys
    main(int(sys.argv[1]))

字符串
8道工序:(我拥有的核心数量):

func: main args: [(8,), {}] took: 2.316840410232544 sec.


16道工序:

func: main args: [(16,), {}] took: 1.7964842319488525 sec.


24道工序:

func: main args: [(24,), {}] took: 2.2560818195343018 sec.

zhte4eai

zhte4eai3#

多处理池的实现使用

if processes is None:
    processes = os.cpu_count() or 1

字符串
不知道这是否回答了你的问题,但至少这是一个数据点。

相关问题