Kafkajs -获取统计信息(滞后)

xytpbqjk  于 2023-03-28  发布在  Apache
关注(0)|答案(2)|浏览(194)

在我们的nest.js应用程序中,我们使用Kafka的kafkajs客户端。我们需要获取机会监视器统计数据。其中一个指标是lag
试图弄清楚kafkajs是否提供了任何有趣的东西。(有效负载中最有趣的事情是:timestampoffsetbatchContext.firstOffsetbatchContext.firstTimestampbatchContext.maxTimestamp

问题

如何记录lag值和kafkajs提供的其他统计信息?
我是否应该考虑实现自己的统计监视器来收集使用kafka.js客户端的节点应用程序中所需的信息?

新详细信息1

下面的文档可以得到batch.highWatermark,其中
batch.highWatermark是主题分区内最后提交的偏移量。它可以用于计算滞后。
尝试

await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async (data) => {
      console.log('Received data.batch.messages: ', data.batch.messages)
      console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
    },
  })

我可以得到下一条信息:

Received data.batch.messages:  [
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '144',
    key: null,
    value: <Buffer 68 65 6c 6c 6f 21>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '145',
    key: null,
    value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '146',
    key: null,
    value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  }
]
Received data.batch.highWatermark:  147

有什么想法如何使用batch.highWatermark在标签计算呢?

yfwxisqw

yfwxisqw1#

看起来获取偏移滞后度量的唯一方法是使用检测事件:

consumer.on(consumer.events.END_BATCH_PROCESS, (payload) =>
  console.log(payload.offsetLagLow),
);

offsetLagLow测量批处理中第一条消息与分区中最后一个偏移量之间的偏移量增量(highWatermark)。您也可以使用offsetLag,但它基于批处理的最后一个偏移量。
正如@Sergii提到的,当你使用eachBatch时,有一些props可以直接使用(herebatch prop上所有可用的方法)。但是如果你使用eachMessage,你就不会得到这些props。所以插装事件是最通用的方法。

7uzetpgm

7uzetpgm2#

这是我们用来计算每个客户端/组/主题/分区的滞后的代码。

import { Kafka } from 'kafkajs'
async function lag (clientConfig) { 
    let status = []
    const kafkaClient = new Kafka(clientConfig)
    const admin = kafkaClient.admin()
    await admin.connect()
    const groups = await admin.listGroups()
    const groupsNames = await groups.groups.map(x => x.groupId)
    const gd = await admin.describeGroups(groupsNames)
    let currentMapOfTopicOffsetByGroupId = {}
    for (const g of gd.groups) {
        const topicOffset = await admin.fetchOffsets({ groupId: g.groupId })
        if (currentMapOfTopicOffsetByGroupId[g.groupId] == undefined) {
            currentMapOfTopicOffsetByGroupId[g.groupId] = {}    
        }
        topicOffset.forEach(to => {
            to.partitions.forEach(p => {
                if (currentMapOfTopicOffsetByGroupId[g.groupId][to.topic] == undefined) {
                    currentMapOfTopicOffsetByGroupId[g.groupId][to.topic] = {}
                }
                currentMapOfTopicOffsetByGroupId[g.groupId][to.topic][parseInt(p.partition)] = p.offset     
            })
        })
        
        for (const m of g.members) {    
            const memberMetadata = AssignerProtocol.MemberMetadata.decode(m.memberMetadata)
            const memberAssignment = AssignerProtocol.MemberAssignment.decode(m.memberAssignment)
            for (const t of memberMetadata.topics) {

                const res = await admin.fetchTopicOffsets(t)    

                res.forEach(r => {
                    const lag = parseInt(r.high) - parseInt(currentMapOfTopicOffsetByGroupId[g.groupId][t][parseInt(r.partition)])
                    if (currentMapOfTopicOffsetByGroupId[g.groupId][t] !== undefined) {
                        status.push({
                            HOST: m.clientHost,
                            STATE: g.state,
                            MEMBER_ID: m.memberId,
                            GROUP_ID: g.groupId,
                            TOPIC: t,
                            PARTITION: r.partition,
                            OFFSET: r.offset,
                            C_OFFSET: parseInt(currentMapOfTopicOffsetByGroupId[g.groupId][t][parseInt(r.partition)]),
                            LAG: lag
                        })
                    }
                }) 
            }
        }
    }
    return status
}

相关问题