java—在twitter storm中使用非jvm语言将真实数据传递给storms spout

bksxznpy  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(299)

我很难理解如何将真实数据传递到喷口,例如:
我有两个文件(它们工作正常):


# ! /usr/bin/env python

import os, random, sys, time

for i in xrange(50):
    print("%s\t%s"%(os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.randint(0,5))


# ! /usr/bin/env python

from __future__ import print_function
from select import select
from subprocess import Popen,PIPE

p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True) 

timeout = 0.1 # seconds
while p:
    # remove finished processes from the list 
    if p.poll() is not None: # process ended
        print(p.stdout.read(), end='') # read the rest
        p.stdout.close()
        processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

现在想象一下,我想把那些随机的行传递给喷口以便将来处理,我尝试了这样的方法:从uuid导入uuid4从select导入select from subprocess import popen,pipe import storm

class TwitterSpout(storm.Spout):

    def initialize(self, conf, context):
        self.pid = os.getpid()
        try:
            self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True)
        except OSError, e:
            self.log('%s'%e)
            sys.exit(1)

在nexttuple()中:

def nextTuple(self):
    timeout = 0.1 # seconds
    while self.p:
        # remove finished processes from the list 
        if self.p.poll() is not None: # process ended
        self.log ("%s"%self.p.stdout.read()) # read the rest
        self.p.stdout.close()
        processes.remove(self.p)

        # wait until there is something to read
        rlist = select([self.p.stdout], [],[], timeout)[0]

        # read a line from each process that has output ready
        for f in rlist:
        self.log ("%s%s"%f.readline()) #NOTE: it can block
        msgId = random.randint(0,500)
        self.log('MSG IN SPOUT %s\n'%msgId)
        storm.emit([f.readline()], id=msgId)

但是这个结构不起作用,我总是出错 "Pipi seems to be broken..." 或者,如果我尝试此代码的不同变体,我会阻止进程,而storm永远不会丰富下一个代码。请帮助我解决我的问题,如果有人能给我一些例子如何做类似的事情,或只是一些建议。谢谢您

2eafrhcq

2eafrhcq1#

可能有多个问题。
这条路没有中断 while 循环——无限循环。
你打电话来 f.readline() 两次。你可能打算每次只叫一次 select .
为避免阻塞,请使用 data = os.read(f.fileno(), 1024) 之后 select .
我不知道封锁是否可以接受 nextTuple() 直到子进程退出。
如果您所做的只是从子流程中读取行,那么就不需要 select :

def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')):
    p = Popen(args, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL,
              bufsize=1, close_fds=True)
    for line in iter(p.stdout.readline, b''): # expect b'\n' newline
        yield line
    p.stdout.close()
    raise StopIteration(p.wait())

例子:


# ...

self.lines = iter_lines(sys.executable, '-u', 'rand_lines.py')

# ...

def nextTuple(self):
    try:
        line = next(self.lines).decode('ascii', 'ignore')
    except StopIteration as e:
        self.exit_status = e.args[0]
    else:
        storm.emit([line.strip()])

相关问题