假设我正在调试一个问题,该问题涉及生成和使用的单个特定消息。我想知道这个信息是什么时候产生的,什么时候被消费的。我有什么办法获得这些信息?
我想当我构建一个消息时,我可以在当前时间内包含它。当我的消费者收到一条消息时,它可以写出一个日志条目。
但是假设我有许多producer和consumer类,但是没有一个代码在做这些事情。Kafka是否已经有了某种东西,可以支持在不必接触这些生产者和消费者的实现的情况下,找到有关特定消息的信息,比如 __consumer_offsets
主题?
假设我正在调试一个问题,该问题涉及生成和使用的单个特定消息。我想知道这个信息是什么时候产生的,什么时候被消费的。我有什么办法获得这些信息?
我想当我构建一个消息时,我可以在当前时间内包含它。当我的消费者收到一条消息时,它可以写出一个日志条目。
但是假设我有许多producer和consumer类,但是没有一个代码在做这些事情。Kafka是否已经有了某种东西,可以支持在不必接触这些生产者和消费者的实现的情况下,找到有关特定消息的信息,比如 __consumer_offsets
主题?
2条答案
按热度按时间goqiplq21#
kafka对发送的消息具有内置的时间戳支持,可以通过consumerrecord(链接)的时间戳方法访问该时间戳
它可以用broker config配置(
log.message.timestamp.type
)或主题级配置(message.timestamp.type
). 其默认值为CreateTime
. 也可以将其设置为LogAppendTime
.createtime:创建生产者记录时(发送前)分配时间戳。
logappendtime:代理将用当前本地时间覆盖时间戳,并将消息附加到日志中。
对于消费时间戳,您唯一的选择是在消息处理完成后获取系统的当前时间。
有关时间戳的更多信息,请检查此项。
zphenhs42#
当涉及到消费时,没有明确的方法来指定消息的消费时间(还要记住,一条消息可以被多个消费组的消费者消费多次)。
不过,有几种可能的方法可以自己追踪:
在接收到记录后(在
.poll(...)
呼叫返回),如果使用使用者组,请监视使用者组的偏移量或查看中的值
__consumer_offsets
(这将要求您反序列化内部格式—有关详细信息,请参阅此答案(请记住,本主题中记录的时间戳与使用者的提交时间戳相对应,因此它们需要提交的次数足以提供正确的granurality,日志压缩+自定义实现:使用与标记消费时间戳相同的键和值发送消息(但是,在压缩发生之前,仍然可以重新读取消息)。