我使用的是confluent.kafka 0.9.5,下面是它的例子。
与它的示例不同的是:我想在本地存储偏移量。所以我初始化了如下配置。
private static Dictionary<string, object> ConstructConfig(string brokerList)
{
string dir = Path.Combine(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "cache", "kafka");
if (!Directory.Exists(dir))
Directory.CreateDirectory(dir);
return new Dictionary<string, object>
{
{ "group.id", Environment.GetEnvironmentVariable("Cluster.Name") },
{ "offset.store.method", "file" }, // Offset commit store method: 'file' - local file store
{ "offset.store.path", dir }, // Path to local file for storing offsets.
{ "offset.store.sync.interval.ms", 0 }, // Use -1 to disable syncing; and 0 for immediate sync after each write.
{ "enable.auto.commit", false },
{ "statistics.interval.ms", 60000 },
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};
}
我成功地收到消息,并打电话给 CommitAsync
他们每个人。
while (true)
{
try
{
msg = null;
if (!consumer.Consume(out msg, ts))
continue;
OnMessage(msg);
log.InfoFormat($"Offset = {msg.Offset}");
await consumer.CommitAsync(msg);
}
catch (Exception ex)
{
// ...
break;
}
}
但目标目录中没有创建任何文件。当我再次启动应用程序时,我会收到相同的消息。我当然有书面许可 offset.store.path
为什么没有存储偏移量?
暂无答案!
目前还没有任何答案,快来回答吧!