实际运行 中的系统,难免 会遇到重新消费某条消息、 跳过 一段 时间内的消息等情况 。 这些异常情况的处理,都和 Offset 有关。
首先来明确一下 Offset 的含义, RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理 。
Offset主要分为本地文件类型和 Broker代存 的类型两种 。
Rocketmq集群有两种消费模式
使用DefaultMQPushConsumer的时候,我们不用关心OffsetStore的 事,使用PullConsumer,我们就要自己处理 OffsetStore。
DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费 消 息:比如 setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_ FIRST_OFFSET),这个语句设置从最小的 Offset开始读取。
如果从队列开始到 感兴趣的消息之间有很大的范围,用 CONSUME_FROM_FIRST_OFFSET参数 就不合适了,可以设置从某个时间开始消 费消息 , 比如 Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP), Consumer. setConsumeTimestamp("20131223171201”), 时间戳格式是精确到秒的 。
/**
* Consumer从哪里开始消费<br>
*
* @author shijia.wxr<vintage.wang@gmail.com>
*/
public enum ConsumeFromWhere {
/**
* 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
* 后续再启动接着上次消费的进度开始消费
*/
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
/**
* 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
* 后续再启动接着上次消费的进度开始消费
*/
CONSUME_FROM_FIRST_OFFSET,
/**
* 一个新的订阅组第一次启动从指定时间点开始消费<br>
* 后续再启动接着上次消费的进度开始消费<br>
* 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数
*/
CONSUME_FROM_TIMESTAMP,
}
注意设置读取位置不是每次都有效,它的优先级默认在 Offset Store后面 , 比如 在 DefaultMQPushConsumer 的 BROADCASTING 方式 下 ,默 认 是 从 Broker 里读取某个 Topic 对 应 ConsumerGroup 的 Offset, 当读 取不到 Offset 的时候, ConsumeFromWhere 的设置才生效 。 大部分情况下这个设置在 Consumer Group初次启动时有效。 如果 Consumer正常运行后被停止, 然后再启动, 会 接着上次的 Offset开始消费, ConsumeFromWhere 的设置元效。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38003389/article/details/86674507
内容来源于网络,如有侵权,请联系作者删除!