以编程方式获取storm拓扑统计信息

kxe2p93d  于 2021-06-21  发布在  Storm
关注(0)|答案(2)|浏览(627)

我正在围绕我的storm拓扑构建一个监视服务,并希望能够获得不同时间窗口中失败元组的数量,类似于storm ui在10m、3h和1d窗口中显示失败元组的数量。
我的监控服务目前是用python构建的,因此如果答案涉及python库或与语言无关的内容(如使用cli或访问rest端点),我将不胜感激。我查看了storm cli和文档,但到目前为止,对于storm ui实际从何处获取信息,我还是空手而归。
编辑:-运行storm版本0.8.2(不幸的是超出了我的控制),所以storm ui rest api(在0.9.2中发布)不幸的是在升级之前不是一个选项。

xdnvmnnf

xdnvmnnf1#

我用python得到这个,如果“failed”太高会重启拓扑

pid = urllib2.urlopen('http://'+host+':'+port+'/api/v1/topology/summary').read()
    data_pid = json.loads(pid)
    for data in data_pid['topologies']:
        if data['name'] == '':
            print 'no topology'
            break
        elif data['name'] == topology_name:
            url_pid = data['id'].encode("UTF-8")
            break
    content = urllib2.urlopen('http://'+host+':'+port+'/api/v1/topology/'+url_pid).read()
    data_content = json.loads(content)
    if data_content['topologyStats'][0]['failed'] == None:
            data_content['topologyStats'][0]['failed'] = 0
    if data_content['topologyStats'][0]['acked'] == None:
            data_content['topologyStats'][0]['acked'] = 0
    if data_content['topologyStats'][0]['acked'] < data_content['topologyStats'][0]['failed']*10:
            global count
            count  = count + 1
            if count == 2:
                    os.system("monit restart "+ monitor_name)
                    logger.info('restart at '+ time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
                    count = 0

如果你想知道更多,http://chenshuxiang.applinzi.com/index.php/2017/09/13/storm-ui/

ryhaxcpt

ryhaxcpt2#

使用storm ui rest api

sqlInjection@foo:~$ curl http://$STORM_UI_HOST_AND_PORT/api/v1/topology/summary

{“topologies”:[{“id”:“topology-1-143604781”,“encodedid”:“topology-1-143604781”,“encodedid”,“name”:“topology-1”,“status”:“active”,“uptime”:“40d 21h 51m 59s”,“taskstotal”:16,“workerstotal”:1,“executostotal”:10}

sqlInjection@foo:~$ curl http://$STORM_UI_HOST_AND_PORT/api/v1/topology/topology-1-1436004781

{“msgtimeout”:30,“spouts”:[{“executors”:3,“emissed”:22336820,“errorlapsedsecs”:755996,“completelatency”:“232.052”,“transferred”:22336820,“acked”:22340300,“errorport”:6703,“spoutid”:“kafkaspout removed”,“tasks”:3,“errorhost”:“removed”,“lasterror”:“java.lang.runtimeexception:java.lang.nullpointerexception\n\t backtype.storm.utils.disruptorqueue.consumerBatchToCursor(disruptorqueue)。java:128)\n\t backtype.storm.utils.disruptorqueue.consumebatch(di),“errorworkerloglink”:http://host:port/log?file=worker-6703.log,“failed”:0,“encodedspoutid”:“kafkaspout removed”}],“executostotal”:8,“uptime”:“67d 21h 15m 2s”,“encodedid”:“topology-1-143604781”,“visualizationtable”:[{”:row“:[{”:stream“:”default“,”:sani stream“:”default1544803905“,”:checked“:true},{”:stream“:”\u ack\u init“,”:sani stream“:”s\u ack\u init973324006“,”:checked“:”false},{“:stream“:”s\u ack\u ack\u ack\u init1278315507“,”:checked“:false},{“:stream”:“\u确认\u失败”,“:sani。。。远离的
如您所见,您甚至可以捕捉到发生在螺栓/喷口中的最后一个错误。

相关问题