// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
// Deprecated: CommitInterval exists for historical compatibility
// and should not be used. Please use Consumer.Offsets.AutoCommit
CommitInterval time.Duration
// AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}
例如:
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
...
// Autocommit after two minutes
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minute
2条答案
按热度按时间mnemlml81#
使用autocommit时,无论如何你都不能完全控制它的发生。它是周期性的,发生在你的幕后。如果不适合你,你也可以用
ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string)
用于在任何时候(即使在特定的时间之后)提交一个偏移量作为一批已消耗消息的最后一个。5jvtdoz22#
可以使用偏移量配置的autocommit字段的interval参数:
例如: