在aws(msk)中使用托管kafka时,kafka.net客户端未接收任何消息

tp5buhyn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(321)

我正在使用aws(msk)中托管的kafka,我有一个简单的生产者/消费者,它以前是工作的。如果我和一个当地的 Docker Kafka(单一经纪人)做比较,生产者和消费者似乎都很好
但当我改用aws-kafka(msk)时,消费者就永远等不到消息了。然后,我尝试使用命令行消费者对我的.net生产者使用awsKafka(msk),这是确定的。
因此,如果我将一些消息从prodcuer发布到aws kafka,并对aws kafka使用命令行consumer,就可以了
我在docker中这样运行命令行consumer

docker run --net=confluent --rm confluentinc/cp-kafka:5.0.0 kafka-console-consumer --bootstrap-server XXXXXX,XXXX,XXXX --topic floop --from-beginning


所以这很好,所以我知道消息正在被发布到“floop”主题,并且可以通过aws kafka消费。只是c#消费者似乎无法分配任何分区,只是在消费时挂起。这是我的消费代码的简化版本

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace Consumer
{
    class Program
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer",
                BootstrapServers = "XXXX,XXXXXX,XXXXX",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("floop");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }
}

有人有什么建议吗?
我能找到的最接近的就是这个
lambda for java消费者收听msk主题时出现问题

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题