Python队列 线程池 进程池 基本概念以及使用方法

x33g5p2x  于2021-09-19 转载在 Python  
字(6.5k)|赞(0)|评价(0)|浏览(462)

线程、进程概念

算是对上一篇文章的回顾,具体细节参见上文。

概念

进程:一个正在运行的应用程序就是一个进程。一个进程是运行在其专用且受保护的内存空间中

线程:一个进程要执行任务必须要有线程。

进程 — 车间 线程 — 车间工人

线程的特点:一个线程执行多个任务是串行执行的

多线程:一个进程中有多个线程。多线程可以并行(同时)执行多个任务

多线程原理:多线程技术是通过利用CPU空闲时间干活来提高程序执行效率

多线程

一个应用程序默认对应一个进程,这个进程(主进程)中默认有一个线程(主线程)

使用方法
  1. from threading import Thread

1)、直接使用Thread类

线程对象 = Thread(target = 函数,args = 参数对应的元组)

线程对象.start()

线程对象.join()

2)、使用Thread类的子类

class 线程子类(Thread):

​ def _ init_(self):

​ super()._ init_()

​ 实现线程对象任务需要的额外数据对应的属性

​ def run(self) -> None:

​ 线程任务

  1. from multiprocessing import Process

3)、进程

进程对象 = Thread(target = 函数,args = 参数对应的元组)

进程对象.start()

进程对象.join()

队列

线程队列
  1. from queue import Queue

Queue模块中的队列,只能保存一般数据或者多线程中产生的数据(多用于多线程,自带线程安全属性)
但是不能用来存储多进程产生的数据

  1. if __name__ == '__main__':
  2. # 1、队列基本用法
  3. # 1)、创建队列对象
  4. queue = Queue()
  5. # 2)、添加数据(进) --- 队列对象.put(数据)
  6. queue.put(9)
  7. queue.put(99)
  8. # 3)、获取数据(出) --- 队列对象.get()
  9. print(f'队列queue中元素个数:{queue.qsize()}')
  10. print(queue.get())
  11. print(f'队列queue中元素个数:{queue.qsize()}')
  12. print(queue.get())
  13. print(f'队列queue中元素个数:{queue.qsize()}')
  14. # 4)、获取队列中元素个数:队列对象.qsize()
  15. # 5)、通过get获取数据的时候如果队列中没有数据,get方法会等待,直到队列有数据或超时
  16. print(queue.get(timeout=5))
队列在线程中的使用方法
  1. from threading import Thread
  2. from queue import Queue,Empty
  3. import time
  4. from random import randint

模拟电影下载函数:

  1. def download(movie_name):
  2. print(f'{movie_name}开始下载')
  3. time.sleep(randint(1,5))
  4. print(f'{movie_name}下载完成')
  5. queue.put(f'{movie_name}数据')

处理数据函数:

  1. def deal_data():
  2. while(True):
  3. data = queue.get()
  4. if(data == 'End'):
  5. break
  6. print(f'处理{data}')
  1. if __name__ == '__main__':
  2. queue = Queue()
  3. # 创建子线程处理数据(方式三对应的代码)
  4. deal_thread = Thread(target=deal_data)
  5. deal_thread.start()
  6. threads = []
  7. names = [f'电影{x}' for x in range(1,11)]
  8. for name in names:
  9. thread = Thread(target=download,args=(name,))
  10. thread.start()
  11. threads.append(thread)
  12. # 获取队列数据方式1:能做到子线程得到数据主线程能立马处理数据,但数据处理完程序无法结束
  13. # while(True):
  14. # data = queue.get()
  15. # print(f'处理{data}数据')
  16. # 获取队列数据方式2:能做到子线程得到数据主线程能立马处理数据,通过超时来判断数据是否处理完成
  17. # while(True):
  18. # try:
  19. # data = queue.get(timeout=5)
  20. # print(f'处理{data}数据')
  21. # except Empty:
  22. # break
  23. # 3、在所有下载数据的线程都结束的时候在队列中添加结束标记,在子线程去获取队列来处理
  24. for t in threads:
  25. t.join()
  26. queue.put('End')

定义全局变量可以在任何一个线程中使用。

进程队列

全局变量无法解决数据的跨进程使用,但可以采用返回值
1)、基本操作

​ 创建队列对象:Queue()

​ 添加数据:队列对象.put(数据)

​ 获取数据:队列对象.get() / 队列对象.get(timeout = 超时时间)

2)、注意事项

如果想要使用一个队列对象获取不同进程

  1. from multiprocessing import Process,Queue,current_process
  2. import time
  3. from random import randint
  1. def download(movie_name,q:Queue):
  2. print(f'{movie_name}开始下载')
  3. time.sleep(randint(1,5))
  4. print(f'{movie_name}下载完成')
  5. q.put(f'{movie_name}')
  6. print(current_process())
  7. def deal_data(q:Queue):
  8. while(True):
  9. data = q.get()
  10. if(data == 'End'):
  11. break
  12. print(f'处理{data}')
  13. print(current_process())
  1. if __name__ == '__main__':
  2. queue = Queue()
  3. process = Process(target=download,args=('电影1',queue))
  4. process.start()
  5. process.join()
  6. print(queue.get())

练习:使用多个进程同时下载多个电影,将下载的电影数据保存到队列中,在一个新的进程中去处理下载到的数据

做到:边下载边处理,下载完就处理完,处理完程序马上结束

  1. processes = []
  2. names = [f'电影{x}' for x in range(1,11)]
  3. for name in names:
  4. p = Process(target=download,args=(name,queue))
  5. p.start()
  6. processes.append(p)
  7. p1 = Process(target=deal_data,args=(queue,))
  8. p1.start()
  9. for p in processes:
  10. p.join()
  11. queue.put('End')

线程池

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. from random import randint
  4. from threading import current_thread
  1. def download(movie_name):
  2. print(f'{movie_name}开始下载')
  3. time.sleep(randint(1,5))
  4. print(f'{movie_name}下载完成')
  5. print(current_thread())

线程池的工作原理 : 提前创建指定个数的线程,保存到一个线程池中

​ 然后再往线程池中添加若干个任务,线程池自动为线程分配任务

  1. 创建线程池,确定线程池中线程数量
  1. pool = ThreadPoolExecutor(max_workers = 10)
  1. 往线程池中添加任务
  1. names = [f'电影{x}' for x in range(1,101)]

​ 1)、一次添加一个任务
​ 线程池.submit(函数,参数1,参数2…)

  1. for name in names:
  2. pool.submit(download,name)

​ 2)、一次性添加多个任务
​ 线程.map(函数,包含所有任务的参数的序列)

  1. pool.map(download,names)

​ 3)、关闭线程池

​ 线程池.shutdown() — 关闭线程池以后,线程池无法再添加任务,但是不影响已经添加的任务的执 行

  1. pool.shutdown()
线程池的使用及功能
  1. def download(movie_name,x):
  2. print(f'{movie_name}-{x}开始下载')
  3. time.sleep(randint(1, 5))
  4. print(f'{movie_name}-{x}下载完成')
  5. return movie_name
  1. if __name__ == '__main__':
  2. # 1、创建线程池
  3. pool = ThreadPoolExecutor(max_workers = 30)
  4. # 2、添加任务
  5. # 线程池.submit(函数) --- 函数可以是有任意多个参数的函数;返回值是一个可操作的future对象
  6. # 线程池.map(函数) --- 函数只能为有且只有一个参数的函数;返回值没法控制和操作
  7. pool.submit(download, '天若有情', 9)
  8. all_movies = [pool.submit(download, f'电影{x}', x * 9) for x in range(1,100)]
  9. # 3、等待任务完成
  10. # wait(all_movies,return_when = ALL_COMPLETED)
  11. # print('所有电影下载完成!')
  12. # 4、获取任务函数的返回值
  13. for movie in as_completed(all_movies):
  14. print(f'--------------------{movie.result()}--------------------')

进程池

  1. import time
  2. from multiprocessing import Pool
  3. from random import randint
  1. def download(movie_name):
  2. print(f'{movie_name}开始下载')
  3. time.sleep(randint(1, 5))
  4. print(f'{movie_name}下载完成')
  5. return movie_name

1)、一次添加一个任务

a.进程池对象.apply(函数, 参数) — 同步/串行(一个一个执行);进程池中的多个任务串行执行

b.进程池对象.apply_async(函数, 参数) — 异步/并行;必须配合close()、join()一起使用

函数 - 任务对应的函数的函数名

参数 - 元组;调用任务函数的时候的实参,需要多少个实参,元祖中就有多少个元素

2)、一次添加多个任务

进程池对象.map(函数,参数序列) — 序列中有多少个元素就添加多少个任务;

​ 进程池中的任务并行,进程池中的任务和主进程串行

进程池.map_async(函数,参数序列) — 进程池中的任务和主程序并行执行

map 和 map_async的返回值是所有任务对应的函数的返回值

  1. if __name__ == '__main__':
  2. pool = Pool(4)
  3. # -------------------------------------------------------------------
  4. # pool.apply(download, ('天若有情',))
  5. #
  6. # for movie in range(20):
  7. # pool.apply_async(download, (f'电影{movie}',))
  8. # result = pool.map_async(download,[f'电影{x}' for x in range(20)])
  9. result = pool.map_async(download,[f'电影{x}' for x in range(20)])
  10. #--------------------------------------------------------------------
  11. # 3、关闭进程池阻止向进程池内添加任务
  12. pool.close()
  13. # 4、等待进程池内的任务全都结束
  14. pool.join()
  15. print(result.get()) #map_async
  16. print(result) #map
线程池的使用 — 51job爬取

网页数据获取函数

  1. def get_one_page(page:int):
  2. headers = {
  3. 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4542.2 Safari/537.36'
  4. }
  5. url = f'https://search.51job.com/list/090200,000000,0000,00,9,99,%25E6%2595%25B0%25E6%258D%25AE%25E5%2588%2586%25E6%259E%2590,2,{page}.html?lang=c&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&ord_field=0&dibiaoid=0&line=&welfare='
  6. response = requests.get(url, headers = headers)
  7. # return response.text
  8. # 正则选取网页中接口数据
  9. json = findall(r'window.__SEARCH_RESULT__ = (.+?)</script>',response.text)[0]
  10. # return json
  11. # json数据转python字典数据
  12. messages = loads(json)
  13. # return messages
  14. job_message = []
  15. # 遍历字典中'engine_search_result'列表将岗位信息添加到job_message中
  16. for result in messages['engine_search_result']:
  17. job_message.append({
  18. '岗位名称':result['job_name'],
  19. '公司名称': result['company_name'],
  20. '公司详情网址': result['company_href'],
  21. '薪资待遇': result['providesalary_text'],
  22. '工作地点': result['workarea_text'],
  23. '公司类型': result['companytype_text'],
  24. '福利待遇': result['jobwelf'],
  25. '公司方向': result['companyind_text']
  26. })
  27. return job_message

创建线程池

  1. pool = ThreadPoolExecutor(max_workers=20)
  2. all_jobs = []

添加任务

  1. for page in range(1,176):
  2. all_jobs.append(pool.submit(get_one_page,page))

在主线程中写入保存数据(此时主线程空闲)

  1. file = open('files/jobs.csv','a',encoding='utf-8')
  2. writer = csv.DictWriter(file,['岗位名称','公司名称','公司详情网址','薪资待遇','工作地点','公司类型','福利待遇','公司方向'])
  3. writer.writeheader()
  4. for job in as_completed(all_jobs):
  5. writer.writerows(job.result())

获取到的部分数据截图:

相关文章