python-3.x 使用线程(在单独的进程内)和多处理队列从Web下载

cgvd09ve  于 2023-03-04  发布在  Python
关注(0)|答案(1)|浏览(106)

我有下面的类从网上下载:

class Downloader:
   
  SENTINEL = END_QUEUE_SENTINEL

    def __init__(self, to_download, downloaded):
        self.to_download = to_download
        self.downloaded = downloaded
        self.mutex = Lock()
        self._stop_workers = False

    @staticmethod
    def _write_to_file(path, content):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "wb") as f:
            f.write(content)

    def _set_item(self, response: HTTPResponse, item):
        if response.status == 200:
            self._write_to_file(item.download_path, content=response.read())
            return item

    def _download_item(self):
        while not self._stop_workers:
            item: = self.to_download.get()
            if item == self.SENTINEL:
                print("sentinel received")
                self.mutex.acquire()
                self._stop_workers = True
                print("self._stop_workers becomes True")
                self.mutex.release()
                print("mutex_released")
                print(self.to_download.qsize())
                break
              req = urlrequest.Request(item.url)
              response = urlrequest.urlopen(req)
              item = self._set_item(response, item)
              self.downloaded.put(item)

    def download_items(self, download_workers=50):
        threads = [Thread(target=self._download_item) for _ in range(download_workers)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print("workers stopped")
        self._stop_workers = False
        self.downloaded.put(self.SENTINEL)

to_downloaddownloaded是多处理队列。
另一个进程正在向to_download队列添加数据。

downloader = Downloader(to_download,downloaded)
processes = [Process(target=add_item_to_download),
             Process(target=downloader.download_items,
                     kwargs={"download_workers": 10})]

        for p in processes:
            p.start()

        for p in processes:
            p.join()

item是具有2个属性(url和download_path)的对象。
一段时间后的第一个进程是将sentinel发送到Downloader。收到sentinel后,打印以下内容:“sentinel接收到”“自身.停止工作线程变为True”“已释放互斥锁”“0”# qsize
然后什么也没发生,进程/线程挂起,即使使用Pycharm调试,我也找不到为什么会发生这种情况。
预期完成线程“join”并打印:““worker stopped”并将该标记添加到下载队列中。

8e2ybdfx

8e2ybdfx1#

有几件事,因为没有类型提示,我假设self.to_download是可迭代的,因此可以使用for循环进行迭代。
而且,这看起来像是用urllib发送http请求。这里也有一些改进。
urllib支持直接流式下载到具有urlretrieve的文件,这可以根据下载大小节省内存。
Process池适用于需要真正并行运行的CPU密集型任务,循环网络调用更像是I/O密集型任务,因此我建议使用Thread池来解决此问题。我使用了ThreadPoolExecutor,它 Package 了Thread,如果您确实需要真正的并行线程,而不仅仅是并发性,请使用ProcessPoolExecutor,它 Package 了Process,但知道使用进程池时存储器共享不完全。
示例如下:

import urllib.request as urlrequest
from concurrent.futures import ThreadPoolExecutor,Future,wait
import typing as T

# quick hacky Item class with constructor
Item = lambda url,download_path: \
    type('Item',(),{'url':url,'download_path':download_path})

# Our ThreadPoolExecutor now manages thread state for us
# No need for class any more to track state, pure function will do
def download_items(items: T.Generator[Item,None,None], download_workers: int = 50):
    # Allocate Thread pool with download_workers Thread
    thread_pool: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=download_workers)
    # Any mutable Iteratble structure will work here (I chose list)
    threads: T.List[Future] = []
    for item in items:
        # Submit our urlretrieve calls to thread pool
        threads.append(thread_pool.submit(urlrequest.urlretrieve,item.url,item.download_path))
    # Blocks main until all threads complete
    wait(threads)
    print("All threads complete")

if __name__=='__main__':
    urls: T.Tuple[str] = ("https://stackoverflow.com","https://google.com","https://reddit.com")
    paths: T.Tuple[str] = ("./stackoverflow.html","./google.html","./reddit.html")
    items: T.Generator[Item,None,None] = (Item(url,path) for url,path in zip(urls,paths))
    download_items(items)

相关问题