如何使用kafka主题并通过http解析/服务?

ej83mcc0  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(396)

我试图使用python中的kafka主题,并使用prometheus客户机通过http提供服务,但我似乎在主题使用上遇到了障碍。我放置了一些占位符来简单地添加度量,但是看起来这部分被阻塞了。

import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY

class CustomCollector(threading.Thread):
    daemon = True

    def collect(self):
        client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
        topic = client.topics[b'os.environ['KAFKA_TOPIC']
        consumer = topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                print(message.value)

        metric = Metric('test_name', 'description', 'summary')
        metric.add_sample('test_name', 'description', 'summary')
        yield metric

if __name__ == '__main__':
    start_http_server(9998)
    REGISTRY.register(CustomCollector())
    while True: time.sleep(1)

如果我运行代码,我会看到主题数据按预期流式传输到控制台。但是,我的度量端点从不填充,对web服务器的任何请求都会挂起,直到我杀死应用程序,它会用库中的标准度量来响应。

hk8txs48

hk8txs481#

建筑 Metric 示例应该在每个已使用的消息中发生一次。也就是说 Metric() 电话应该在 for message in consumer 循环。另外,你可能想用 message.value 在创建 Metric 示例。

相关问题