kafka客户机单独或批处理提交消息

t9eec4r0  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(423)

我试图让kafka消费者在特定的时间内收集消息,然后我可以手动提交已收集的消息。但是我找不到shopify sarama中可以用来提交一条消息或一批消息的方法或api,请帮忙

mnemlml8

mnemlml81#

使用autocommit时,无论如何你都不能完全控制它的发生。它是周期性的,发生在你的幕后。如果不适合你,你也可以用 ConsumerGroupSession.MarkOffset(topic string, partition int32, offset int64, metadata string) 用于在任何时候(即使在特定的时间之后)提交一个偏移量作为一批已消耗消息的最后一个。

5jvtdoz2

5jvtdoz22#

可以使用偏移量配置的autocommit字段的interval参数:

// Offsets specifies configuration for how and when to commit consumed
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
    // Deprecated: CommitInterval exists for historical compatibility
    // and should not be used. Please use Consumer.Offsets.AutoCommit
    CommitInterval time.Duration

    // AutoCommit specifies configuration for commit messages automatically.
    AutoCommit struct {
        // Whether or not to auto-commit updated offsets back to the broker.
        // (default enabled).
        Enable bool

        // How frequently to commit updated offsets. Ineffective unless
        // auto-commit is enabled (default 1s)
        Interval time.Duration
    }

例如:

// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
...

// Autocommit after two minutes
config.Consumer.Offsets.AutoCommit.Interval = 2 * time.Minute

相关问题