Azure:如何将邮件从毒药队列移回主队列?

3bygqnnd  于 2023-05-18  发布在  其他
关注(0)|答案(9)|浏览(74)

我想知道是否有一个工具或库,可以移动队列之间的消息?目前,我正在做的事情如下

public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("myqueue");
    queue.CreateIfNotExists();

    var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
    queue.AddMessage(new CloudQueueMessage(messageData));
}
c9qzyr3d

c9qzyr3d1#

截至(2018-09-11),Microsoft Azure Storage Explorer的1.4.1版本不具备将消息从一个Azure队列移动到另一个队列的能力。
blogged一个简单的解决方案,将有毒邮件传输回原始队列,并认为这可能会保存一些人几分钟。显然,您需要修复导致消息最终进入有毒消息队列的错误!
您需要添加一个NuGet包引用到Microsoft.NET.Sdk.Functions

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;

void Main()
{
    const string queuename = "MyQueueName";

    string storageAccountString = "xxxxxx";

    RetryPoisonMesssages(storageAccountString, queuename);
}

private static int RetryPoisonMesssages(string storageAccountString, string queuename)
{
    CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

    int count = 0;
    while (true)
    {
        var msg = poisonqueue.GetMessage();
        if (msg == null)
            break;

        poisonqueue.DeleteMessage(msg);
        targetqueue.AddMessage(msg);
        count++;
    }

    return count;
}

private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference(queuename);

    return queue;
}
kr98yfug

kr98yfug2#

Azure Storage Explorer版本1.15.0现在可以在2020年实现这一点。https://github.com/microsoft/AzureStorageExplorer/issues/1064

cyej8jka

cyej8jka3#

Azure Storage基本上不支持将消息从一个队列移动到另一个队列。你得靠自己去做。
实现将消息从一个队列移动到另一个队列的一种方法是将消息从源队列中出队(通过调用GetMessages),读取消息的内容,然后在目标队列中创建新消息。您可以通过使用存储客户端库来实现这一点。
我想到的一个移动消息的工具是Cerebrata Azure Management Studio(付费产品,15天免费试用)。它有这个功能。
截至(2018-09-11),Microsoft Azure Storage Explorer的版本1.4.1不支持移动队列消息。

abithluo

abithluo4#

下面是Mitch回答的更新版本,使用的是最新的Microsoft.Azure.Storage.Queue包。只需创建一个新的.NET Console应用程序,将上面提到的包添加到其中,并将Program.cs的内容替换为以下内容:

using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;

namespace PoisonMessageDequeuer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            const string queuename = "MyQueueName";

            string storageAccountString = "xxx";

            await RetryPoisonMesssages(storageAccountString, queuename);
        }

        private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
        {
            var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
            var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

            var count = 0;
            while (true)
            {
                var msg = await poisonqueue.GetMessageAsync();
                if (msg == null)
                    break;

                await poisonqueue.DeleteMessageAsync(msg);
                await targetqueue.AddMessageAsync(msg);
                
                count++;
            }

            return count;
        }

        private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
        {
            var storageAccount = CloudStorageAccount.Parse(storageAccountString);
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference(queuename);

            return queue;
        }
    }
}

但是如果处理的消息数量超过1000条,它仍然很慢,所以我建议使用批处理API以获得更大的数量。

9avjhtql

9avjhtql5#

这里有一个Python脚本,你可能会觉得很有用。您需要安装azure-storage-queue

queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    while queueService.peek_messages(queue.name):
      for message in queueService.get_messages(queue.name, 32):
        print(".", end="", flush=True)
        queueService.put_message(targetQueueName, message.content)
        queueService.delete_message(queue.name, message.id, message.pop_receipt)
scyqe7ek

scyqe7ek6#

我只是不得不再次这样做,并花时间更新我的snipped到新的存储SDK。更多信息请参见https://www.bokio.se/engineering-blog/how-to-re-run-the-poison-queue-in-azure-webjobs/
下面是我使用的代码

using Azure.Storage.Queues;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AzureQueueTransfer
{
    internal class Program
    {
        // Need Read, Update & Process (full url, can create in storage explorer)
        private const string sourceQueueSAS = ""; 

        // Need Add (full url, can create in storage explorer)
        private const string targetQueueSAS = "";
        private static async Task Main(string[] args)
        {
            var sourceQueue = new QueueClient(new Uri(sourceQueueSAS));
            var targetQueue = new QueueClient(new Uri(targetQueueSAS));

            var queuedAny = true;
            while (queuedAny)
            {
                Thread.Sleep(30000); // Sleep to make sure we dont build too much backlog so we can process new messages on higher prio than old ones
                queuedAny = false;
                foreach (var message in sourceQueue.ReceiveMessages(maxMessages: 32).Value)
                {
                    queuedAny = true;
                    var res = await targetQueue.SendMessageAsync(message.Body);

                    Console.WriteLine($"Transfered: {message.MessageId}");
                    await sourceQueue.DeleteMessageAsync(message.MessageId, message.PopReceipt);
                }

                Console.WriteLine($"Finished batch");
            } 
        }
    }
}
slhcrj9b

slhcrj9b7#

对于任何来这里寻找一个相当于@MitchWheats的Node的人,使用Azure函数回答。

import AzureStorage from 'azure-storage'
import { Context, HttpRequest } from '@azure/functions'
import util from 'util'

const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()

const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)

export async function run (context: Context, req: HttpRequest): Promise<void> {
  try {
    const poisonQueue = (req.query.queue || (req.body && req.body.queue));
    const targetQueue = poisonQueue.split('-')[0]

    let count = 0

    while (true) {
      const message = await getMessage(poisonQueue)
      if (!message) { break; }
      if (message.messageText && message.messageId && message.popReceipt) {
        await createMessage(targetQueue, message.messageText)
        await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
      }
      count++
    }

    context.res = {
      body: `Replayed ${count} messages from ${poisonQueue} on ${targetQueue}`
    };
  } catch (e) {
    context.res = { status: 500 }
  }
}

要使用该函数,您需要提供用于存储队列的存储帐户的连接信息。这作为环境变量提供。您可以提供AZURE_STORAGE_ACCOUNTAZURE_STORAGE_ACCESS_KEY,或者提供AZURE_STORAGE_CONNECTION_STRING。更多关于这方面的信息可以在Azure Storage SDK docs中找到。
也写了几行关于它在这个Medium article

aij0ehis

aij0ehis8#

根据Jon Canning的回答更新了python:

from azure.storage.queue import QueueServiceClient

queueService = QueueServiceClient.from_connection_string(conn_str="DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<key>;EndpointSuffix=core.windows.net")

for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    queue = queueService.get_queue_client(queue=queue.name)
    targetQueue = queueService.get_queue_client(queue=targetQueueName)
    while queue.peek_messages() :
        messages = queue.receive_messages()
        for msg in messages:
            targetQueue.send_message(msg.content)
            queue.delete_message(msg)
yyhrrdl8

yyhrrdl89#

正如Mikael Eliasson所指出的,code in IGx89 answer损坏是因为
AddMessageAsync将覆盖消息上的一些信息,然后DeleteMessageAsync将给予404。更好的解决方案是将值复制到AddMessageAsync的新消息中
请参阅RetryPoisonMessages的增强版本,该版本仅指定消息列表(而不是队列中的所有消息),并允许复制消息而不是移动消息。它还记录每条消息的成功/失败。

/// <param name="storageAccountString"></param>
/// <param name="queuename"></param>
/// <param name="idsToMove">If not null, only messages with listed IDs will be moved/copied</param>
/// <param name="deleteFromPoisonQueue">if false,  messages will be copied; if true, they will be moved
///Warning: if queue is big, keeping deleteFromPoisonQueue=false can cause the same row 
///from poisonqueue to be copied more than once(the reason is not found yet)</param>
/// <returns></returns>
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename, string[] idsToMove=null, bool deleteFromPoisonQueue=false)
{
    var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    var poisonQueueName = queuename + "-poison";
    var poisonqueue = GetCloudQueueRef(storageAccountString, poisonQueueName);

    var count = 0;
    while (true)
    {
        var msg = await poisonqueue.GetMessageAsync();
        if (msg == null)
        {
            Console.WriteLine("No more messages in a queue " + poisonQueueName);
            break;
        }

        string action = "";
        try
        {
            if (idsToMove == null || idsToMove.Contains(msg.Id))
            {
                var msgToAdd = msg;
                if (deleteFromPoisonQueue)
                {
                    //The reason is that AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404.
                    //The better solution is to copy the values into a new message for AddMessageAsync 
                     msgToAdd = new CloudQueueMessage(msg.AsBytes);
                }

                action = "adding";
                await targetqueue.AddMessageAsync(msgToAdd);
                Console.WriteLine(action + " message ID " + msg.Id);
                if (deleteFromPoisonQueue)
                {
                    action = "deleting";
                    await poisonqueue.DeleteMessageAsync(msg);
                }
                Console.WriteLine(action + " message ID " + msg.Id);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Error encountered when "+ action + " " + ex.Message + " at message ID " + msg.Id);
        }

        count++;
    }

    return count;
}

相关问题