使用kafka 0.8.2.0跟踪主题大小和使用者延迟

wxclj1h5  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(361)

自从Kafka0.8.2.0以来,跟踪消费者的滞后和主题大小似乎变得非常困难
如何跟踪Kafka中的偏移量(主题大小)和延迟?当你的制作者插入一条消息时,你是否在某处增加一个计数器,当你的消费者确认一条消息时,你是否增加另一个计数器?
我使用的是airbnb的kafka-statsd-metrics2,但是所有关于主题大小的指标都是 0 出于某种原因,这可能是他们的错误报告,但你怎么做呢?
我们的消费者和生产者是用kafka python用python编写的,他们说他们不支持consumercoordinator offset api,所以我准备了一个查询zookeeper并将这些度量发送到statsd示例的解决方案(看起来很尴尬),但是我仍然缺少主题大小度量。
我们使用collectd来收集系统指标,我没有使用jmx的经验,在collectd中配置它似乎非常复杂,我尝试过几次,所以我找到了一些不这样做的方法。
如果您有任何意见,我很乐意听到,即使是:“这属于x stackexchange网站”

webghufk

webghufk1#

有人能说memberassignment是从哪个模块导入的吗?
memberassignment.decode(member\u assignment)中的for(主题,分区)。assignment:

blmhpbnm

blmhpbnm2#

你用过吗https://github.com/quantifind/kafkaoffsetmonitor 用于监控消费滞后。它适用于0.8.2.0

hsgswve4

hsgswve43#

如果我理解正确,你可以 HighwaterMarkOffsetFetchResponse . 通过这种方式,您将知道分区末尾的偏移量是多少,并能够将其与当前确认的偏移量或此分区中最后一条消息的偏移量进行比较 FetchResponse 例如。
此处显示详细信息

mwecs4sa

mwecs4sa4#

以下是代码片段,请确保在活动控制器中运行此程序。引导服务器是活动的控制器ip。

client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, request_timeout_ms=300)
      list_groups_request  = client.list_consumer_groups()

      for group in list_groups_request:
        if group[1] == 'consumer':
          list_mebers_in_groups = client.describe_consumer_groups([group[0]])
          (error_code, group_id, state, protocol_type, protocol, members) = list_mebers_in_groups[0]

          if len(members) !=0:
            for member in members:
              (member_id, client_id, client_host, member_metadata, member_assignment) = member
              member_topics_assignment = []
              for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
                member_topics_assignment.append(topic)

              for topic in member_topics_assignment:
                consumer = KafkaConsumer(
                          bootstrap_servers=BOOTSTRAP_SERVERS,
                          group_id=group[0],
                          enable_auto_commit=False
                          )
                consumer.topics()

                for p in consumer.partitions_for_topic(topic):
                  tp = TopicPartition(topic, p)
                  consumer.assign([tp])
                  committed = consumer.committed(tp)
                  consumer.seek_to_end(tp)
                  last_offset = consumer.position(tp)
                  if last_offset != None and committed != None:
                    lag = last_offset - committed
                    print "group: {} topic:{} partition: {} lag: {}".format(group[0], topic, p, lag)

相关问题