我正在尝试启动接收器进程。这是一个类,它只从socket接收数据。例如,它只会在csv文件中写入行。我会通过管道发送出口信号。在该信号之后,单独过程中的while循环将结束,并且该单独的写入器过程将结束。
import multiprocessing
from datetime import datetime
import time
import datetime
import csv
from multiprocessing import Process, Pipe
class CSVWriter:
def __init__(self, pipe: multiprocessing.Pipe):
self.pipe = pipe
def write(self):
with open(f'RTD.csv', 'w') as csvfile:
writer = csv.DictWriter(csvfile, delimiter=',', fieldnames=['date'])
while self.pipe.recv() != 'exit':
writer.writerow({'date': (datetime.datetime.now())})
time.sleep(1)
print('exit csv')
pipe_receiver, pipe_sender = Pipe()
def write_csv(pipe):
csv_writer = CSVWriter(pipe=pipe)
csv_writer.write()
process = Process(target=write_csv, args=(pipe_receiver,))
process.start()
pipe_sender.send(input())
process.join()
process.close()
字符串
但是,与我的期望相反,它没有创建文件。或者,如果我输入了错误的退出键,它会创建空文件。
我挣扎与管道大约2天,不能理解的问题。我将非常感谢任何信息。
P.S.我使用的是进程BC,我需要从套接字接收数据非常快(在示例中,我只是将数据写入文件)。所以,我不能使用线程。
1条答案
按热度按时间qcbq4gxm1#
首先,您的客户端打开File并输入
with
-语句,然后文件应该被刷新到磁盘并关闭。pipe.recv()
会阻止执行,直到数据通过管道发送,这种情况只有在调用pipe_sender.send()
时才会发生。您将input()
的结果直接传递到管道中,其结果由recv()
接收。当input()
返回除了'exit'之外的内容时,recv()
将再次被调用,但这一次,父进程不会再发送数据,因为'pipe_sender.send(input())'语句只执行了一次。因此,recv()
将无限期阻塞,从而阻止with
-语句退出。因此,文件的写缓冲区永远不会刷新,并且它保持为空。此外,你的父进程随后调用join()
,这会导致死锁:父进程正在等待子进程退出,但它不能这样做,因为它正忙碌等待来自具有
recv()
的父进程的数据。子进程正在等待父进程使用
recv()
向它发送数据,但它不能这样做,因为它正忙碌等待子进程退出。当“exit”被输入时,while-loop被完全跳过,程序的其余部分按照您的期望执行;在这种情况下也会创建一个空文件。
最好的解决方案是使用线程(线程通常比进程创建更快,并且更容易处理,因为你可以访问相同的变量和对象)或者(因为你调用join并等待进程),在同一个程序中完成所有事情