在本地文件中存储偏移量

z9smfwbn  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(233)

我使用的是confluent.kafka 0.9.5,下面是它的例子。
与它的示例不同的是:我想在本地存储偏移量。所以我初始化了如下配置。

private static Dictionary<string, object> ConstructConfig(string brokerList)
    {
        string dir = Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "cache", "kafka");
        if (!Directory.Exists(dir))
            Directory.CreateDirectory(dir);
        return new Dictionary<string, object>
        {
            { "group.id", Environment.GetEnvironmentVariable("Cluster.Name") },
            { "offset.store.method", "file" },  // Offset commit store method: 'file' - local file store 
            { "offset.store.path", dir }, // Path to local file for storing offsets. 
            { "offset.store.sync.interval.ms", 0 }, // Use -1 to disable syncing; and 0 for immediate sync after each write. 
            { "enable.auto.commit", false },
            { "statistics.interval.ms", 60000 },
            { "bootstrap.servers", brokerList },
            { "default.topic.config", new Dictionary<string, object>()
                {
                    { "auto.offset.reset", "smallest" }
                }
            }
        };
    }

我成功地收到消息,并打电话给 CommitAsync 他们每个人。

while (true)
           {
               try
               {
                   msg = null;
                   if (!consumer.Consume(out msg, ts))
                       continue;
                   OnMessage(msg);

                   log.InfoFormat($"Offset = {msg.Offset}");

                   await consumer.CommitAsync(msg);
               }
               catch (Exception ex)
               {
                   // ...
                   break;
               }
           }

但目标目录中没有创建任何文件。当我再次启动应用程序时,我会收到相同的消息。我当然有书面许可 offset.store.path 为什么没有存储偏移量?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题