Azure服务总线队列:在增加deliveryCount的同时重新计划消息

fkaflof6  于 2023-05-29  发布在  其他
关注(0)|答案(2)|浏览(182)

我使用Azure服务总线队列收集需要发布到API的消息。我正在队列上使用Azure触发器函数来处理传入消息。不幸的是,API是不可靠的,所以我正在添加我自己的重试计划。
例如,第一次无法处理消息时,我想在15分钟内安排另一次重试,然后是1小时、2小时,依此类推。到达maxDeliveryCount后,应将消息发送到DLQ。
消息的调度正在工作,但是,每个调度的消息都被视为新消息,这意味着deliveryCount始终为1。我使用@azure/service-bus NPM包在Azure函数中执行重新调度。
因此,我的问题是:如何在Azure触发器函数中重新计划SB队列消息,同时增加这些消息的deliveryCount
(我知道我可以将自己的deliveryCount属性添加到消息中,但我不想用元数据污染消息体。)
为了提供更多细节,这是一个MRE:

const serviceBusQueueTrigger: AzureFunction = async function (
  context: Context,
  msg: any
): Promise<void> {
  context.log("ServiceBus queue trigger function processed message", msg);

  const serviceBusClient = new ServiceBusClient(<connection_string>);
  const sender = serviceBusClient.createSender(<queue_name>);

  context.log("dequeueCount: ", context.bindingData.deliveryCount);

  try {
    // POST to API
  } catch (error) {
    
    // if an error is thrown here, trigger resubmits immediately
    // I want to wait proportionally to current deliveryCount
    
    await sender.scheduleMessages(
      {
        body: msgData,
        contentType: "application/json",
      },
      // just an example of schedule time
      new Date(Date.now() + Math.pow(2, context.bindingData.deliveryCount) * 60 * 1000)
    );
  }
  
  await sender.close();
}

===
编辑:虽然Microsoft Docs仍然包含有关内置重试机制的(模糊的)细节(@Manish在他的回答中链接),但该功能似乎不再支持。相关的讨论可以在here中找到。
因此,我最初的问题是,如何使用Azure服务总线触发器函数构建此机制?谢谢!
host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.0.0, 5.0.0)"
  },
  "concurrency": {
    "dynamicConcurrencyEnabled": true,
    "snapshotPersistenceEnabled": true
  },
  "extensions": {
    "serviceBus": {
      "clientRetryOptions": {
        "mode": "exponential",
        "tryTimeout": "00:01:00",
        "delay": "00:01:00",
        "maxDelay": "03:00:00",
        "maxRetries": 3
      }
    }
  }
}

function.json

{
  "bindings": [
    {
      "name": "incoming",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "incoming",
      "connection": "SERVICEBUS"
    }
  ],
  "retry": {
    "strategy": "exponentialBackoff",
    "maxRetryCount": 5,
    "minimumInterval": "00:00:10",
    "maximumInterval": "00:15:00"
  },
  "scriptFile": "../dist/trigger/index.js"
}
c3frrgcw

c3frrgcw1#

问题是,每当你在队列中发送一条消息时,它就像对待一条新消息一样。
怎么修?
你不必在队列中推送新消息,只需删除try-catch,所有发送代码和函数应用程序将负责休息。

const serviceBusQueueTrigger: AzureFunction = async function (
  context: Context,
  msg: any
): Promise<void> {
  context.log("ServiceBus queue trigger function processed message", msg);
  context.log("dequeueCount: ", context.bindingData.deliveryCount);
    // POST to API
 
}

请确保您的function.json文件看起来像下面(代码复制自下面的链接)

{
"bindings": [
    {
    "queueName": "testqueue",
    "connection": "MyServiceBusConnection",
    "name": "myQueueItem",
    "type": "serviceBusTrigger",
    "direction": "in"
    }
],
"disabled": false
}

更多信息-> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=python-v2%2Cin-process%2Cextensionv5&pivots=programming-language-javascript
对于指数重试,请确保您有host.json设置如下

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "clientRetryOptions":{
                "mode": "exponential",
                "tryTimeout": "00:01:00",
                "delay": "00:00:00.80",
                "maxDelay": "00:01:00",
                "maxRetries": 3
            },
            "prefetchCount": 0,
            "transportType": "amqpWebSockets",
            "webProxy": "https://proxyserver:8080",
            "autoCompleteMessages": true,
            "maxAutoLockRenewalDuration": "00:05:00",
            "maxConcurrentCalls": 16,
            "maxConcurrentSessions": 8,
            "maxMessageBatchSize": 1000,
            "sessionIdleTimeout": "00:01:00",
            "enableCrossEntityTransactions": false
        }
    }
}

x1c 0d1x更多信息-> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-javascript
编辑1:在阅读github issue之后,我们可以尝试使用重试

{
    "disabled": false,
    "bindings": [
        {
            ....
        }
    ],
    "retry": {
        "strategy": "exponentialBackoff",
        "maxRetryCount": 5,
        "minimumInterval": "00:00:10",
        "maximumInterval": "00:15:00"
    }
}

编辑2:确保您已经安装了bundle

{
    "version": "2.0",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[3.3.0, 4.0.0)"
    }
}
juud5qan

juud5qan2#

@Manish我可以确认function.json上的重试选项有效。
@picklepick,为了解决你的需求,你可以按照Manish的建议在function.json上使用重试选项,并将队列上的最大交付计数设置为1。这将允许使用function.json在函数级别完成所有重试,然后在完成并失败后将消息移动到DLQ。
您也可以使用2(功能级别和服务总线级别)的组合,即让函数retries完成,将消息移回队列,并让它再次执行函数retries。为此,您需要将最大交付计数设置为2。
因此,使用这些选项,您可以使用配置执行重试,而无需在函数本身中引入额外的重试代码。

相关问题