拆分要发送到Azure服务总线的消息批

2q5ifsrm  于 2023-01-09  发布在  其他
关注(0)|答案(3)|浏览(158)

假设我有一个消息集合List<BrokeredMessage>,我希望将其批量发送到Azure服务总线。
集合大小是任意的,因此所有消息的总大小可能超过Service Bus施加的256 k限制。如何以最佳方式将其拆分为较小的块?
这项任务看似简单,但似乎并非如此:每个BrokeredMessage的大小在我尝试发送它之前是未知的。Size属性只返回消息正文的大小,没有标题和其他开销。
如果我尝试发送1000条消息,每条消息的主体为250字节,我将得到MessageSizeExceededException,问题是现在我甚至不能重试,因为消息已经被使用,所以我必须重新创建所有的BrokeredMessage
因此,我目前看到的唯一方法是在发送大量小消息时对批处理大小非常保守,这可能会消耗一些吞吐量。
是否有更可靠和/或更干净的方法?

lvmkulzt

lvmkulzt1#

因此,我目前看到的唯一方法是在发送大量小消息时对批处理大小非常保守,这可能会消耗一些吞吐量。
这不仅会消耗吞吐量,而且会消耗可靠性。当使用MessageSender.SendBatchAsync()时,所有消息都作为一个原子操作发送,并且一起成功或失败。
是否有更可靠和/或更干净的方法
使用TransactionScope Package 所有的发送将达到相同的效果,但是您将不再以批处理的形式发送消息。
如果你仍然想发送批 * 和 * 确保你不陷入大小/计数问题,就像建议你可以分块你的发送。不幸的是,Size属性是一个大小估计不去。它报告正文**之前 * 序列化。除非使用Stream,然后序列化没有应用。然后事件,标准属性和自定义属性仍然会扭曲您的大小。重新排列WindowsAzure. ServiceBus的文档时,BrokeredMessage的MSDN API文档中丢失了以下注解:
若要获取BrokeredMessage大小的准确值,应在完成对BrokeredMessage的发送/接收操作后读取Size属性。
我已经采取了一种基于估计大小的分块方法。估计大小是基于一定的填充百分比来膨胀消息的大小,预计平均消息将小于填充大小。加上基于字符串的属性的平均假定大小。在这个blog post中,我'我已经阐述了估计单个消息大小的思想,该消息大小将用于计算可以作为批处理发出的块大小。

pkwftd7m

pkwftd7m2#

为了完成Sean的回答,Paolo Salvatori为MessageSender类编写了一些扩展方法,您可以在此处找到:

基本上,它会迭代所有消息并对其进行批处理,因此大小不会超过最大批处理大小。
我遇到了一些问题,因为BrokeredMessage.Size没有考虑BrokerMessageProperties。我稍微修改了他的版本,添加了Properties大小:

/// <summary>
/// This class contains extensions methods for the <see cref="MessageSender"/> class.
/// </summary>
public static class MessageSenderExtensions
{
    private const string BrokeredMessageListCannotBeNullOrEmpty = "The brokeredMessageEnumerable parameter cannot be null or empty.";
    private const string SendPartitionedBatchFormat = "[MessageSender.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const string SendPartitionedBatchAsyncFormat = "[MessageSender.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const int MaxBathSizeInBytes = 262144;

    /// <summary>
    /// Sends a set of brokered messages (for batch processing). 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="messageSender">The current <see cref="MessageSender"/> object.</param>
    /// <param name="messages">The collection of brokered messages to send.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    /// <returns>The asynchronous operation.</returns>
    public async static Task SendPartitionedBatchAsync(this MessageSender messageSender, IEnumerable<BrokeredMessage> messages, bool trace = false)
    {
        var brokeredMessageList = messages as IList<BrokeredMessage> ?? messages.ToList();
        if (messages == null || !brokeredMessageList.Any())
        {
            throw new ArgumentNullException(BrokeredMessageListCannotBeNullOrEmpty);
        }

        var batchList = new List<BrokeredMessage>();
        long batchSize = 0;

        foreach (var brokeredMessage in brokeredMessageList)
        {
            // Hack because the size of the brokered message does not take into account the size of the properties
            var messageSize = GetObjectSize(brokeredMessage.Properties) + brokeredMessage.Size;
            if ((batchSize + messageSize) > MaxBathSizeInBytes)
            {
                // Send current batch
                await messageSender.SendBatchAsync(batchList);
                Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));

                // Initialize a new batch
                batchList = new List<BrokeredMessage> { brokeredMessage };
                batchSize = messageSize;
            }
            else
            {
                // Add the BrokeredMessage to the current batch
                batchList.Add(brokeredMessage);
                batchSize += messageSize;
            }
        }

        // The final batch is sent outside of the loop
        await messageSender.SendBatchAsync(batchList);
        Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
    }

    /// <summary>
    /// Calculates the lenght in bytes of an object and returns the size.
    /// </summary>
    private static int GetObjectSize(object objectToTest)
    {
        var bf = new BinaryFormatter();
        using (var ms = new MemoryStream())
        {
            bf.Serialize(ms, objectToTest);
            return ms.ToArray().Length;
        }
    }
}
xuo3flqw

xuo3flqw3#

current library中,ServiceBusMessageBatch类有一个方法,允许您查看添加消息是否会超过批处理大小,允许您编写如下内容...

public async Task SendBatchAsync<T>(ServiceBusSender sender, IEnumerable<ServiceBusMessage> messages)
{
    // Don't use using - we want to explicitly control batch lifecycle
    var batch = await sender.CreateMessageBatchAsync();

    foreach (var message in messages)
    {
        if (!batch.TryAddMessage(message))
        {
            // Batch is full, so send this batch
            await sender.SendMessagesAsync(batch);
            batch.Dispose();

            // And create a new batch starting with this message
            batch = await sender.CreateMessageBatchAsync();
            batch.TryAddMessage(message);
        }
    }

    // Send the final batch
    await sender.SendMessagesAsync(batch);
    batch.Dispose();
}

相关问题