NodeJS 如何从Azure存储队列触发容器应用?

c3frrgcw  于 2022-12-18  发布在  Node.js
关注(0)|答案(1)|浏览(106)

我对使用NodeJS进行开发和使用容器还是个新手。
我正在NodeJS中开发一个侦听Azure存储队列的小应用程序。现在我希望队列中的新消息触发该应用程序,但我不知道如何实现。我计划使用Azure容器应用程序。
我在Microsoft SDK中看到了可以用于从队列读取消息的方法,但它是命令式的。可能是带有计时器的方法。当队列中有新消息时,是否可以执行容器应用中的函数?
有人知道吗?

1sbrub3j

1sbrub3j1#

您可以采取以下两种一般方法:
1.使用事件源SDK(如您所述)。这是特定SDK所要求的命令式(或声明式)。在本例中,它是Azure存储队列SDK,您需要类似于

const sleep = t => new Promise(resolve => setTimeout(resolve, t));

while (true) {
    const receivedMessages = await queueClient.receiveMessages();

    if (receivedMessages.receivedMessageItems.length > 0) {
        for (const msg of receivedMessages.receivedMessageItems) {
            // call your logic to handle and delete the msg                
            // await queueClient.deleteMessage(message.messageId, message.popReceipt);
        }
    } else {
        // queue is empty, check in 5 seconds
        await sleep(5000)
    }
}

1.使用一些其他的框架来为你抽象,并在消息上调用你的方法,例如daprAzure Functions

*对于Dapr,创建一个类似https://docs.dapr.io/reference/components-reference/supported-bindings/storagequeues/的dapr组件(请注意,如果你在Azure容器应用上部署,你必须使用Azure的ARM(或bicep)或使用az cli定义该组件。此示例创建一个blob组件,但它应该与队列组件非常相似。因此,你将拥有一个类似于https://docs.dapr.io/reference/components-reference/supported-bindings/storagequeues/的dapr组件:

# dapr-queue-component.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: ricardos-queue
spec:
  type: bindings.azure.storagequeues
  version: v1
  metadata:
  - name: accountName
    value: "yourAccountName"
  # you can use managed identity in Azure to not have to set this here
  # the doc linked above shows how to do that.
  # - name: accountKey
  #  value: "***********"
  - name: queueName
    value: "myqueue"

然后创建它:

➜ az containerapp env dapr-component set \
    --name $CONTAINERAPPS_ENVIRONMENT --resource-group $RESOURCE_GROUP \
    --dapr-component-name ricardos-queue \
    --yaml dapr-queue-component.yaml


最后你的代码看起来像这样(see dapr docs for more details):

//dependencies 
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr'; 

//code
const daprHost = "127.0.0.1"; 
const serverHost = "127.0.0.1";
const serverPort = "6002"; 
const daprPort = "3602"; 

start().catch((e) => {
    console.error(e);
    process.exit(1);
});

async function start() {
    const server = new DaprServer(serverHost, serverPort, daprHost, daprPort, CommunicationProtocolEnum.HTTP);
    await server.binding.receive('ricardos-queue', async (msg) => {
        // handle msg
    });
    await server.startServer();
}

***对于Azure Functions:**您可以安装Azure Functions cli,然后

➜ func init . --javascript
➜ func new --template "Azure Queue Storage trigger" --name my-function-name
➜ cat my-function-name/function.json
{
  "bindings": [
    {
      "name": "myQueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "js-queue-items", # change to your queue name
      "connection": "CONNECTION_STRING" # env var with a connection string in it. or value in local.settings.json
    }
  ]
}

➜ cat my-function-name/index.js

module.exports = async function (context, msg) {
    # handle msg
};

在Azure容器应用上部署时,可以将缩放规则设置为如下所示 (有关定义缩放规则的详细信息,请参阅此处的文档,以便在队列上没有消息时应用将缩放为0)

"scale": {
        "maxReplicas": 10,
        "minReplicas": 0,
        "rules": [
          {
            "name": "my-queue-scale-rule",
            "custom": {
              "type": "azure-queue",
              "metadata": {
                "queueName": "my-queue",
                "queueLength": "50" // each instance can handle up to 50 messages with a max of 10 instances
              },
              "auth": [
                {
                  "secretRef": "my-connection-string",
                  "triggerParameter": "connection"
                }
              ]
            }
          }
        ]
      }
  • 无论使用何种SDK或框架(Azure Storage SDK、dapr、Azure Functions等),您始终可以使用类似的缩放规则根据负载上下缩放队列处理器 *

相关问题