如何使用www.example.com核心为消费者创建Kafka DLQasp.net
var kafkaConfig = new ConsumerConfig
{
GroupId = _configuration["ConsumerGroup"],
BootstrapServers = _configuration["KafkaServer"]
};
using (var consumer = new ConsumerBuilder<int, string>(kafkaConfig).Build())
{
consumer.Subscribe(_configuration["Topic"]);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
Console.WriteLine($"Consumed message '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
if (!TryConsume(consumeResult, stoppingToken))
{
await _retryQueueProducer.RetryAsync(consumeResult);
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
finally
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
上面一个是示例代码
如何在DLQ中显示失败的消息值
1条答案
按热度按时间f3temu5u1#
您知道如何使用Producer,并且知道如何编写try catch。
这就是你所需要的。从一个消费的记录中捕获异常,然后使用一个生成器发送到一个新的主题,你现在有一个DLQ。
编写一个新消费者以订阅DLQ。
您应该在代码之外创建实际的主题。