在我们的nest.js
应用程序中,我们使用Kafka的kafkajs客户端。我们需要获取机会监视器统计数据。其中一个指标是lag
。
试图弄清楚kafkajs是否提供了任何有趣的东西。(有效负载中最有趣的事情是:timestamp
、offset
、batchContext.firstOffset
、batchContext.firstTimestamp
、batchContext.maxTimestamp
)
问题
如何记录lag
值和kafkajs
提供的其他统计信息?
我是否应该考虑实现自己的统计监视器来收集使用kafka.js
客户端的节点应用程序中所需的信息?
新详细信息1
下面的文档可以得到batch.highWatermark
,其中batch.highWatermark
是主题分区内最后提交的偏移量。它可以用于计算滞后。
尝试
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async (data) => {
console.log('Received data.batch.messages: ', data.batch.messages)
console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
},
})
我可以得到下一条信息:
Received data.batch.messages: [
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '144',
key: null,
value: <Buffer 68 65 6c 6c 6f 21>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '145',
key: null,
value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '146',
key: null,
value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
}
]
Received data.batch.highWatermark: 147
有什么想法如何使用batch.highWatermark
在标签计算呢?
2条答案
按热度按时间yfwxisqw1#
看起来获取偏移滞后度量的唯一方法是使用检测事件:
offsetLagLow
测量批处理中第一条消息与分区中最后一个偏移量之间的偏移量增量(highWatermark
)。您也可以使用offsetLag
,但它基于批处理的最后一个偏移量。正如@Sergii提到的,当你使用
eachBatch
时,有一些props可以直接使用(here是batch
prop上所有可用的方法)。但是如果你使用eachMessage
,你就不会得到这些props。所以插装事件是最通用的方法。7uzetpgm2#
这是我们用来计算每个客户端/组/主题/分区的滞后的代码。