我正在我的项目中尝试kafka的poc,并使用confluent.kafka库在.net core 2.1中创建了两个控制台应用程序。我已经在ubuntubox上安装了kafka,它运行正常。当我使用producer console应用程序将数千条消息推入kafka并在消息中添加序列号时。当我在我的消费者控制台应用程序中使用这些消息时,消息的顺序不正确。只有一个生产者和消费者,他们都与一个主题有关。下面是我的制作人的代码
public class Kafta
{
private Dictionary<string, object> config;
private string topicName;
public Kafta(string topic)
{
config = new Dictionary<string, object>
{
{"bootstrap.servers","192.168.60.173:9092" }
};
topicName = topic;
}
public async void SendMessageAsync(string message)
{
using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
var msg = await producer.ProduceAsync(topicName, "userid", message);
//producer.ProduceAsync("console", null, message);
producer.Flush(500);
}
}
}
program.cs生产商
static void Main(string[] args)
{
string topic = "tester2";
long count = 1;
Console.WriteLine("Starting to send message");
Console.WriteLine("Write the message here: ");
if(args.Length == 2)
{
topic = args[0];
count = long.Parse(args[1]);
}
try
{
Console.WriteLine("Topic name " + topic);
var message = Console.ReadLine();
var service = new Kafta(topic);
for(var i = 0; i<count;i++)
{
var msg = message + " number " + i.ToString();
Console.WriteLine("Message to Kafta: " + msg);
service.SendMessageAsync(msg);
}
}
catch (Exception ex)
{
Console.WriteLine("Exception occured " + ex.Message);
}
finally
{
Console.WriteLine("Press any key to exit");
Console.Read();
}
}
消费者代码
static void Main(string[] args)
{
var config = new Dictionary<string, object>
{
{ "group.id", "sample-consumer" },
{ "bootstrap.servers", "192.168.60.173:9092" },
{ "enable.auto.commit", "false"}
};
string topic = "tester2";
if (args.Length == 1)
topic = args[0];
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(new string[] { topic });
consumer.OnMessage += (_, msg) =>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
consumer.CommitAsync(msg);
};
while (true)
{
consumer.Poll(100);
}
}
}
生产商产量
Message to Kafta: message number 0
Message to Kafta: message number 1
Message to Kafta: message number 2
Message to Kafta: message number 3
Message to Kafta: message number 4
Message to Kafta: message number 5
Message to Kafta: message number 6
Message to Kafta: message number 7
Message to Kafta: message number 8
Message to Kafta: message number 9
用户输出:
message number 4
message number 7
message number 0
message number 1
message number 2
message number 3
message number 5
message number 6
message number 8
message number 9
我是新来Kafka,不知道我错过了什么,使它正常工作。根据kafka文档,我的用例可以保证消息的顺序,所以肯定有一些愚蠢的错误,我正在做,无法弄清楚。
除了Kafka我还有别的选择吗?
谢谢
2条答案
按热度按时间eh57zj3b1#
根据Kafka文件,信息的订购是有保证的
仅限于每个分区。从你的问题来看,你没有提到你的主题有多少个分区。你在印刷
Topic: {msg.Topic} Partition: {msg.Partition}
,但这不是你的帖子的输出。。在你的制作人里,你是在和
SendMessageAsync
并且不验证代理是否实际收到了带有该方法返回值的消息。所以这是一种可能性-您的print语句将按顺序排列,但消息不一定以这种方式到达代理。如果代码中显示的使用者输出中的分区号总是相同的,而我不熟悉c#api,那么看起来您使用的是非阻塞使用者消息侦听器。那个
OnMessage
函数很可能在一个单独的线程中被调用,这个线程不一定以保证的顺序写入标准输出。更好的测试可能是在每条消息中插入一个时间戳,而不仅仅是一个计数器除了Kafka我还有别的选择吗?
还有很多其他的mq技术,比如rabbitmq,所以如果您不关心kafka的持久性特性和其他api(streams和connect),可以随意使用它们
4smxwvx52#
正如@cricket\u007所提到的,拥有一个主题和多个分区意味着只有从一个分区接收到的数据是有序的。
当您创建一个使用者(只有一个)时,需要所有分区才能从中读取消息。然后,来自分区的数据会同步显示为红色(是),但您从中接收消息的分区会随之更改。
假设您用4个分区为主题生成了100条消息。为了简洁起见,假设每个分区存储25条消息。当您启动消费者时,它会收到如下消息(示例):
来自分区1的5条消息
来自分区2的4条消息
6来自分区3的消息
来自分区4的2条消息
来自分区1的3条消息
...
这是因为使用者试图均匀地读取所有分区。