python异步IO脚本未完成

mjqavswn  于 2021-09-08  发布在  Java
关注(0)|答案(1)|浏览(447)

我正在构建一个使用asyncio和3个队列的python脚本。我用4个步骤处理来自不同来源的数据,想法是使用队列保存一个步骤的结果,以便尽可能快地用于下一个步骤。脚本正在做它应该做的事情,但是由于某些原因,我不知道当所有的数据都被处理后,脚本没有完成。为了理解这个问题,我构建了一个简化版本的脚本,在这里我做简单的数学运算。
首先,我用50个介于0和10之间的randon数填充第一个队列。接下来,我获取存储在queue1中的数,将其平方,并将结果放入queue2。接下来,我得到存储在queue2中的平方数,将其加倍并将结果存储在queue3中。最后,我得到存储在queue3中的最终结果,并将其附加到 Dataframe ,然后将结果保存到文件中。
正如我所说。上面描述的过程是有效的,但当我处理完所有elemens queue3时,我希望该过程会完成。
这是我为演示我的问题而构建的玩具代码的第一个版本

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue):
    for k in range(50):
        r=random.randint(0,10)
        #await asyncio.sleep(r)
        await queue.put((k,r))

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]

    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    await queue1.join()
    await queue2.join() 
    await queue3.join()

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing

if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

对这个问题做一些研究,我发现这是另一个问题
[1] :有效使用多个异步IO队列,建议不要使用queue.join和sentinel shutdonw。

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue1):
    for k in range(50):
        r=random.randint(0,10)
        queue1.put_nowait((k,r))
    queue1.put_nowait(None)

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        if r is None:
            await queue2.put(None)
            break

        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        if r is None:
            await queue3.put(None)
            break

        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        if r is None:
            break

        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]

    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing

if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

但这并没有解决问题。我还尝试从类定义中删除函数,但效果不太好。
我开始使用asyncio模块,我想我犯了一些我看不到的基本错误。欢迎提供任何提示。
更新
我进一步简化了这个问题,得到了一些有趣的结果,可以得出答案。我创建了另一个玩具代码,它只使用一个队列来存储初始randon编号。代码从此队列中获取数字,并将其平方,然后以Termal格式打印。这种代码的和平结束了。所以我认为问题可能在某种程度上与我使用多个队列这一事实有关。

import asyncio
import random

class asyncio_toy():

    def __init__(self):
        ...

    async def generate_random_number(self,i:int,queue):
        for _ in range(i):
            r=random.randint(0,5)
            await asyncio.sleep(r)
            await queue.put((i,r))

    async def square_scan(self,k,queue):
        while True:
            (i,r) = await queue.get()
            print(f'prod {i} - cons {k} - {r} - {r*r}')
            queue.task_done()

    async def main(self):
        queue = asyncio.Queue()
        prod = [asyncio.create_task(self.generate_random_number(n,queue)) for n in range(5)]
        cons = [asyncio.create_task(self.square_scan(k,queue)) for k in range(4)]

        await asyncio.gather(*prod)
        await queue.join() 

        for c in cons:
            c.cancel()

### testing

if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())
eqzww0vc

eqzww0vc1#

如果我发送 None 五次,因为有五个函数使用相同的 queue 他们都需要 None 退出 while -循环。

for x in range(5): 
     queue.put(None)
import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])

    async def generate_random_number(self, i:int, queue):
        for k in range(10):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k,r))

        for x in range(5):
            await queue.put(None)

    async def square_it(self,n,queue1,queue2):
        while True:
            #print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
            r = await queue1.get()

            if r is None:
                print('exit: SQUARE IT', n)  
                await queue2.put(None)
                break

            k, r = r
            await asyncio.sleep(1)
            await queue2.put((k, r*r))
            queue1.task_done()
            #print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

    async def double_it(self,n,queue2,queue3):
        while True:
            #print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
            r = await queue2.get()

            if r is None:
                print('exit: DOUBLE IT', n)  
                await queue3.put(None)
                break

            k, r = r
            await asyncio.sleep(1)
            await queue3.put((k, 2*r))
            queue2.task_done()
            #print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

    async def save_it(self,n,queue3):
        while True:
            #print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
            r = await queue3.get()

            if r is None:
                print('exit: SAVE IT', n)  
                break

            k, r = r
            await asyncio.sleep(1)
            self.df.loc[len(self.df)]=[k, r]
            self.df.to_csv('final_result.csv')
            queue3.task_done()
            #print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result

        rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
        save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]

        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')

        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()

        print('cancel')
        for a in square_scan:
            a.cancel()

        for b in double_scan:
            b.cancel()

        for c in save_scan:
            c.cancel()

### testing

if __name__ == '__main__':

    toy = asyncio_toy()

    asyncio.run(toy.main())

编辑:
使用的版本 while runningrunning = False 停止所有线程。

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])
        self.running_square_it = True
        self.running_double_it = True
        self.running_save_it = True

    async def generate_random_number(self, i, queue):
        for k in range(20):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k, r))

        #for x in range(5):
        await queue.put(None)

    async def square_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')

        while self.running_square_it:

            if not queue_input.empty():
                r = await queue_input.get()

                if r is None:
                    print('exit: SQUARE IT', n)  
                    await queue_output.put(None)
                    self.running_square_it = False
                else:
                    k, r = r
                    await queue_output.put((k, r*r))

            await asyncio.sleep(0.1)  # need it to run other loops

        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')

    async def double_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')

        while self.running_double_it:

            if not queue_input.empty():
                r = await queue_input.get()

                if r is None:
                    print('exit: DOUBLE IT', n)  
                    await queue_output.put(None)
                    self.running_double_it = False
                else:
                    k, r = r
                    await queue_output.put((k, 2*r))

            await asyncio.sleep(0.1)  # need it to run other loops

        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')

    async def save_it(self, n, queue_input):
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue_input.qsize()).zfill(2)}')

        while self.running_save_it:

            if not queue_input.empty():
                r = await queue_input.get()

                if r is None:
                    print('exit: SAVE IT', n)  
                    self.running_save_it = False
                else:            
                    k, r = r
                    self.df.loc[len(self.df)] = [k, r]
                    self.df.to_csv('final_result.csv')

            await asyncio.sleep(0.1)  # need it to run other loops

        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue_input.qsize()).zfill(2)}')

    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result

        rand_gen    = [asyncio.create_task(self.generate_random_number(n, queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k, queue1, queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k, queue2, queue3)) for k in range(10)]
        save_scan   = [asyncio.create_task(self.save_it(k, queue3)) for k in range(5)]

        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')

        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()

        print('cancel')
        #for a in square_scan:
        #    a.cancel()

        #for b in double_scan:
        #    b.cancel()

        #for c in save_scan:
        #    c.cancel()

### testing

if __name__ == '__main__':
    toy = asyncio_toy()
    asyncio.run(toy.main())

相关问题