我很难理解如何将真实数据传递到喷口,例如:
我有两个文件(它们工作正常):
# ! /usr/bin/env python
import os, random, sys, time
for i in xrange(50):
print("%s\t%s"%(os.getpid(), i))
sys.stdout.flush()
time.sleep(random.randint(0,5))
和
# ! /usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen,PIPE
p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
timeout = 0.1 # seconds
while p:
# remove finished processes from the list
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)
# wait until there is something to read
rlist = select([p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block
现在想象一下,我想把那些随机的行传递给喷口以便将来处理,我尝试了这样的方法:从uuid导入uuid4从select导入select from subprocess import popen,pipe import storm
class TwitterSpout(storm.Spout):
def initialize(self, conf, context):
self.pid = os.getpid()
try:
self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
except OSError, e:
self.log('%s'%e)
sys.exit(1)
在nexttuple()中:
def nextTuple(self):
timeout = 0.1 # seconds
while self.p:
# remove finished processes from the list
if self.p.poll() is not None: # process ended
self.log ("%s"%self.p.stdout.read()) # read the rest
self.p.stdout.close()
processes.remove(self.p)
# wait until there is something to read
rlist = select([self.p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
self.log ("%s%s"%f.readline()) #NOTE: it can block
msgId = random.randint(0,500)
self.log('MSG IN SPOUT %s\n'%msgId)
storm.emit([f.readline()], id=msgId)
但是这个结构不起作用,我总是出错 "Pipi seems to be broken..."
或者,如果我尝试此代码的不同变体,我会阻止进程,而storm永远不会丰富下一个代码。请帮助我解决我的问题,如果有人能给我一些例子如何做类似的事情,或只是一些建议。谢谢您
1条答案
按热度按时间2eafrhcq1#
可能有多个问题。
这条路没有中断
while
循环——无限循环。你打电话来
f.readline()
两次。你可能打算每次只叫一次select
.为避免阻塞,请使用
data = os.read(f.fileno(), 1024)
之后select
.我不知道封锁是否可以接受
nextTuple()
直到子进程退出。如果您所做的只是从子流程中读取行,那么就不需要
select
:例子: