并发编程(二)

x33g5p2x  于2022-02-07 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(243)


本文总结了:什么是僵尸进程、孤儿进程、如何守护进程、乐观锁和悲观锁概念、消息队列如何简易实现、IPC机制解决进程无法数据互通、以及生产者消费者模型👆

并发编程(二)

僵尸进程

正常:进程代码运行结束之后并没有直接结束而是需要等待回收子进程资源才能结束;

僵尸进程是当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程。. 如果父进程先退出 ,子进程被init接管,子进程退出后init会回收其占用的相关资源;

通俗理解:如果我们出去露营,在回家的时候是不是需要把摆出来的行李收拾回去才可以呀,不能人嗨了东西不要了;

from multiprocessing import Process
import time

def process_test(name):
    print(f'{name} is running!')
    time.sleep(2)
    print(f'{name} is over!')

if __name__ == '__main__':
    p = Process(target=process_test,args=('Hammer',))
    p.start()
    print('主线程')

# 主进程执行了print输出语句已经没有代码可以执行了,但是子进程是在该进程内开设的,需要等待回收子进程的资源

孤儿进程

即主进程已经死亡(非正常)但是子进程还在运行

孤儿进程 一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。 孤儿进程将被init进程 (进程号为1)所收养,并由init进程对它们完成状态收集工作;

通俗的理解:“没爹没妈”

这样是浪费资源的,没有主进程回收子进程,最后会由操作系统linit回收;

守护进程

守护进程:即守护着某个进程 一旦这个进程结束那么也随之结束;

通俗理解:“进程陪葬”

from multiprocessing import Process
import time

def test(name):
    print('总管:%s is running' % name)
    time.sleep(3)
    print('总管:%s is over' % name)

if __name__ == '__main__':
    p = Process(target=test, args=('Hammer',))
    p.daemon = True  # 设置为守护进程(一定要放在start语句上方)
    p.start()
    print("皇帝寿终正寝")
# 只有主进程的print语句,而子进程的输出语句不会执行
# 当主进程结束了,子进程也随即结束,就相当于皇帝死了,太监们就别玩了,而是去陪葬

注意:p.daemon = True在启动子进程前运行!

互斥锁

问题:并发情况下操作同一份数据 极其容易造成数据错乱

解决措施:将并发变成串行 虽然降低了效率但是提升了数据的安全

锁(行锁、表锁···)就可以实现将并发变成串行的效果

'''data.txt'''
{"ticket_num":1}

'''buy.py'''
import json
from multiprocessing import Process, Lock
import time
import random

# 查票
def search(name):
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)  # 反序列化
        ticket_num = data_dict.get('ticket_num')
        print('%s查询余票:%s' % (name, ticket_num))

# 买票
def buy(name):
    # 先查票
    with open(r'data.txt', 'r', encoding='utf8') as f:
        data_dict = json.load(f)
        ticket_num = data_dict.get('ticket_num')
    # 模拟一个延迟
    time.sleep(random.random())
    # 判断是否有票
    if ticket_num > 0:
        # 将余票减一
        data_dict['ticket_num'] -= 1
        # 重新写入数据库
        with open(r'data.txt', 'w', encoding='utf8') as f:
            json.dump(data_dict, f)
        print('%s: 购买成功' % name)
    else:
        print('不好意思 没有票了!!!')

def run(name,mutex):
    search(name)
    mutex.acquire()  # 抢锁
    buy(name)
    mutex.release()  # 释放锁
# 只需将买票上锁

# 开设多个进程
if __name__ == '__main__':
    mutex = Lock()  # 多进程如果不上锁就会出现数据错乱
    for i in range(1, 11):
        p = Process(target=run, args=('用户%s' % i,mutex))
        p.start()
        
# 抢锁的概念就相当于十个人去抢一个茅坑,一个人拉上了,后面的人只能等着了,拉完了(释放锁),别人才能继续抢锁;
        
# 在以后的编程生涯中 几乎不会解除到自己操作锁的情况

乐观锁与悲观锁

乐观锁

  • 总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁;
  • 乐观锁:假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性;
  • 乐观锁:多数用于数据争用不大、冲突较少的环境中,这种环境中,偶尔回滚事务的成本会低于读取数据时锁定数据的成本,因此可以获得比其他并发控制方法更高的吞吐量。
    相对于悲观锁,在对数据库进行处理的时候,乐观锁并不会使用数据库提供的锁机制。一般的实现乐观锁的方式就是记录数据版本;

悲观锁

  • 总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程,和Python中GIL锁一样)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁;
  • 悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作;
  • 悲观锁:主要用于数据争用激烈的环境,以及发生并发冲突时使用锁保护数据的成本要低于回滚事务的成本的环境中;

参考:https://blog.csdn.net/qq_34337272/article/details/81072874

消息队列

队列:先进先出

方法示例写在代码中

from multiprocessing import Queue

# 括号内可以填写最大的等待数,队列最多装5个数
q = Queue(5)

# put方法,存放数据
q.put(111)
q.put(222)
print(q.full())  # 判断队列是否放满
q.put(333)
q.put(444)
q.put(555)
print(q.full())  # 队列放满了

# 超出范围原地等待,直到有空缺位置
# q.put(666)

# 提取数据
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())

# 只有五个数,取六个的时候,原地等待,直到有数据过来
# print(q.get())

# 判断数据是否为空
print(q.empty())
# 获取数据,没有数据立刻报错
print(q.get_nowait())  # queue.Empty

注意:

  • fullget_nowait不能用于多进程情况下的精确使用,如果另外进程从当前进程拿数据,或者存入数据,会造成数据的偏差,从而full方法get_nowait方法判断就会出错;
  • 队列的使用就可以打破进程间默认无法通信的情况,这样就相当于开拓了一块公共空间(消息队列),每个进程都可以从中拿或存数据;

IPC机制

队列的使用就可以打破进程间默认无法通信的情况,IPC机制来实现

# 主进程和子进程通过消息队列来实现数据互通

# IPC机制来解决进程间无法通信的问题
from multiprocessing import Queue,Process

def consumer(q):
    print(q.get())  # 子进程获取主进程存放的数据
    q.put("子进程存放的数据")   # 模拟开设的子进程中存放的数据

if __name__ == '__main__':
    q = Queue()  # 生成队列
    q.put("主进程存放的数据") # 模拟数据
    # 开设子进程
    p = Process(target=consumer,args=(q,))
    p.start()
    p.join()  # 用join方法来确认子进程执行完,主进程再去取数据
    print(q.get())  # 主进程获取
    print("主进程")
# 子进程和子进程通过消息队列来实现数据互通
from multiprocessing import Queue,Process

def process1(q):
    q.put("子进程1存放的数据!")

def process2(q):
    print(f'子进程2取子进程1存放的数据:>>>{q.get()}')

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=process1,args=(q,))
    p2 = Process(target=process2,args=(q,))
    p1.start()
    p2.start()

生产者消费者模型

👉

  • 生产者:负责产生数据
  • 消费者:负责处理数据
    爬虫领域用的比较多,该模型需要解决供需不平衡的现象
# 使用到模块JoinableQueue,能够监测谁给队列中放数据和取数据
from multiprocessing import Queue, Process, JoinableQueue
import time
import random

def producer(name, food, q):
    for i in range(10):
        print('%s 生产了 %s' % (name, food))
        q.put(food)
        time.sleep(random.random())

def consumer(name, q):
    while True:
        data = q.get()
        print('%s 吃了 %s' % (name, data))
        q.task_done()

if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()   # 队列模块换一下
    p1 = Process(target=producer, args=('厨子Hammer', '玛莎拉', q))
    p2 = Process(target=producer, args=('印度阿三', '飞饼', q))
    p3 = Process(target=producer, args=('泰国阿人', '榴莲', q))
    c1 = Process(target=consumer, args=('班长阿飞', q))

    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True   # 守护进程
    c1.start()

    p1.join()
    p2.join()
    p3.join()

    q.join()  # 等待队列中所有的数据被取干净

    print('主进程')
    
# 使用模块JoinableQueue并且添加守护进程和join方法是为了让消费者结束等待的情况

相关文章