我有下面的类从网上下载:
class Downloader:
SENTINEL = END_QUEUE_SENTINEL
def __init__(self, to_download, downloaded):
self.to_download = to_download
self.downloaded = downloaded
self.mutex = Lock()
self._stop_workers = False
@staticmethod
def _write_to_file(path, content):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
f.write(content)
def _set_item(self, response: HTTPResponse, item):
if response.status == 200:
self._write_to_file(item.download_path, content=response.read())
return item
def _download_item(self):
while not self._stop_workers:
item: = self.to_download.get()
if item == self.SENTINEL:
print("sentinel received")
self.mutex.acquire()
self._stop_workers = True
print("self._stop_workers becomes True")
self.mutex.release()
print("mutex_released")
print(self.to_download.qsize())
break
req = urlrequest.Request(item.url)
response = urlrequest.urlopen(req)
item = self._set_item(response, item)
self.downloaded.put(item)
def download_items(self, download_workers=50):
threads = [Thread(target=self._download_item) for _ in range(download_workers)]
for t in threads:
t.start()
for t in threads:
t.join()
print("workers stopped")
self._stop_workers = False
self.downloaded.put(self.SENTINEL)
to_download
和downloaded
是多处理队列。
另一个进程正在向to_download
队列添加数据。
downloader = Downloader(to_download,downloaded)
processes = [Process(target=add_item_to_download),
Process(target=downloader.download_items,
kwargs={"download_workers": 10})]
for p in processes:
p.start()
for p in processes:
p.join()
item
是具有2个属性(url和download_path)的对象。
一段时间后的第一个进程是将sentinel发送到Downloader
。收到sentinel后,打印以下内容:“sentinel接收到”“自身.停止工作线程变为True”“已释放互斥锁”“0”# qsize
然后什么也没发生,进程/线程挂起,即使使用Pycharm调试,我也找不到为什么会发生这种情况。
预期完成线程“join”并打印:““worker stopped”并将该标记添加到下载队列中。
1条答案
按热度按时间8e2ybdfx1#
有几件事,因为没有类型提示,我假设
self.to_download
是可迭代的,因此可以使用for
循环进行迭代。而且,这看起来像是用urllib发送http请求。这里也有一些改进。
urllib支持直接流式下载到具有
urlretrieve
的文件,这可以根据下载大小节省内存。Process
池适用于需要真正并行运行的CPU密集型任务,循环网络调用更像是I/O密集型任务,因此我建议使用Thread
池来解决此问题。我使用了ThreadPoolExecutor
,它 Package 了Thread
,如果您确实需要真正的并行线程,而不仅仅是并发性,请使用ProcessPoolExecutor
,它 Package 了Process
,但知道使用进程池时存储器共享不完全。示例如下: