rabbitmq 成批处理/使用消息

wn9m85ua  于 2022-12-29  发布在  RabbitMQ
关注(0)|答案(2)|浏览(237)

我对消费信息有清晰的理解:http://docs.masstransit-project.com/en/latest/usage/consumer.html
这些实现一次仅处理一个消息。
我需要一次处理多条消息,批量处理。

jq6vz3qz

jq6vz3qz1#

大众运输现在有一个实验性的功能,处理个别消息的一批。
配置总线:

_massTransitBus = Bus.Factory.CreateUsingRabbitMq(
                cfg =>
                    {
                        var host = cfg.Host(new Uri("amqp://@localhost"),
                            cfg =>
                                {
                                    cfg.Username("");
                                    cfg.Password("");
                                });

                        cfg.ReceiveEndpoint(
                            host,
                            "queuename",
                            e =>
                                {
                                    e.PrefetchCount = 30;
                                    e.Batch<MySingularEvent>(
                                        ss =>
                                            {
                                                ss.MessageLimit = 30;
                                                ss.TimeLimit = TimeSpan.FromMilliseconds(1000);
                                                ss.Consumer(() => new BatchSingularEventConsumer());
                                            });
                                });
                    });

创造你的消费者:

public class BatchSingularEventConsumer: IConsumer<Batch<MySingularEvent>>
{
    public Task Consume(ConsumeContext<Batch<MySingularEvent>> context)
    {           
         Console.WriteLine($"Number of messages consumed {context.Message.Length}");
         return Task.CompletedTask;
    }
}

您可以使用消息限制时间限制配置批处理。
我建议阅读ChrisPatterson关于Batch Message Consumption问题的文章,特别是关于预取的部分
批处理大小必须小于或等于任何预取计数或并发消息传递限制,才能达到大小限制。如果其他限制阻止达到批处理大小,则永远不会调用使用者。
批量消费在大众运输网站上也是documented

06odsfpq

06odsfpq2#

事实证明,今天你可以做到这一点:

public class MyConsumer : IConsumer<Batch<MyMessage>>
{
    public async Task Consume(ConsumeContext<Batch<MyMessage>> context)
    {
        ...
    }
}

相关问题