我正在使用aws(msk)中托管的kafka,我有一个简单的生产者/消费者,它以前是工作的。如果我和一个当地的 Docker Kafka(单一经纪人)做比较,生产者和消费者似乎都很好
但当我改用aws-kafka(msk)时,消费者就永远等不到消息了。然后,我尝试使用命令行消费者对我的.net生产者使用awsKafka(msk),这是确定的。
因此,如果我将一些消息从prodcuer发布到aws kafka,并对aws kafka使用命令行consumer,就可以了
我在docker中这样运行命令行consumer
docker run --net=confluent --rm confluentinc/cp-kafka:5.0.0 kafka-console-consumer --bootstrap-server XXXXXX,XXXX,XXXX --topic floop --from-beginning
所以这很好,所以我知道消息正在被发布到“floop”主题,并且可以通过aws kafka消费。只是c#消费者似乎无法分配任何分区,只是在消费时挂起。这是我的消费代码的简化版本
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace Consumer
{
class Program
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer",
BootstrapServers = "XXXX,XXXXXX,XXXXX",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("floop");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}
}
有人有什么建议吗?
我能找到的最接近的就是这个
lambda for java消费者收听msk主题时出现问题
暂无答案!
目前还没有任何答案,快来回答吧!