npm Azure服务总线使用者会随着时间的推移而变慢

k2fxgqgv  于 2023-08-06  发布在  其他
关注(0)|答案(1)|浏览(103)

我有不同的订阅主题,每个订阅都有一个消费者。问题是随着时间的流逝,信息的数量会越来越少.我使用了maxconcurrentcalls。但它似乎也只在开始时起作用。我使用的是Azure/service-bus npm包。以及Azure bus code sample使用用于消费消息的代码。虽然对它进行了一些更改,用于相关调用和自动完成消息。
我之前为所有消费者建立了一个单一的连接,我尝试为每个消费者建立一个单独的连接。但没有用
代码:

let receiver: any
let sbClient: ServiceBusClient
try {
    sbClient = new ServiceBusClient(connectionString);
    logger.info("Azure connection is establish successfully for topic:", topic)
}
catch (Err) {
    logger.error("Error in subscribing events from azure service bus", Err)
    return
}
try {
    receiver = sbClient?.createReceiver(topic, subscriptionName);
    logger.debug(`Receiver for ${topic} Connected Successfully.`)
}
catch (err) {
    sbClient?.close()
    logger.error(`Error in creating receiver for ${topic}`, err)
    return null
}

try {
    const subscription = receiver.subscribe({
        processMessage: async (brokeredMessage: ServiceBusReceivedMessage) => {
            var input = brokeredMessage.body
            let result = await processData(input)
            if (result) {
                await receiver.completeMessage(brokeredMessage)
            } else {
                await receiver.abandonMessage(brokeredMessage)
            }
        },

        processError: async (args: any) => {
            logger.error(`Error from source ${args.errorSource} occurred: `, args.error);

            // the `subscribe() call will not stop trying to receive messages without explicit intervention from you.
            if (isServiceBusError(args.error)) {
                switch (args.error.code) {
                    case "MessagingEntityDisabled":
                    case "MessagingEntityNotFound":
                    case "UnauthorizedAccess":
                        // It's possible you have a temporary infrastructure change (for instance, the entity being
                        // temporarily disabled). The handler will continue to retry if `close()` is not called on the subscription - it is completely up to you
                        // what is considered fatal for your program.
                        logger.error(
                            `An unrecoverable error occurred. Stopping processing. ${args.error.code}`,
                            args.error
                        );
                        await subscription.close();
                        break;
                    case "MessageLockLost":
                        logger.error(`Message lock lost for message`, args.error);
                        break;
                    case "ServiceBusy":
                        // choosing an arbitrary amount of time to wait.
                        await delay(1000);
                        break;
                    default:
                        logger.error("Error in processing message", args)
                }
            }
        },
    }, { autoCompleteMessages: false, maxConcurrentCalls: 100 });

    return receiver
}
catch (err) {
    await receiver?.close();
    logger.error("Error in subscribing messages", err)
    return null
}

字符串

zzzyeukh

zzzyeukh1#

检查以下几点以加快此过程:

  • autoCompleteMessages应该设置为false。因此,根据处理结果,避免建立未完成的消息,这可能会减慢消费者的速度,并检查您是否在处理后迅速完成消息。
  • 使用接收方的prefetchCount属性。预取计数确定接收方在单个请求中可以请求的最大消息数。
  • lockDuration设置使用者锁定消息以进行处理的时间长短由锁定持续时间确定。
receiveMode:  "peekLock",
maxConcurrentCalls:  30,
autoCompleteMessages:  false,

字符串

验证码:

const  {  ServiceBusClient  } = require("@azure/service-bus");
async  function  startConsumer(connectionString,  topic,  subscriptionName)  {
let  receiver;
let  sbClient;
try  {
sbClient = new  ServiceBusClient(connectionString);
console.log("Azure connection established successfully for topic:",  topic);
}  catch (err) {
console.error("Error in connecting to Azure Service Bus:",  err);
return;
}
try  {
receiver = sbClient.createReceiver(topic,  subscriptionName,  {
receiveMode:  "peekLock",
maxConcurrentCalls:  30,
autoCompleteMessages:  false,
});
console.log(`Receiver for ${topic} connected successfully.`);
}  catch (err) {
sbClient.close();
console.error(`Error in creating receiver for ${topic}:`,  err);
return  null;
}
const  processMessage = async  (brokeredMessage)  =>  {
const  input = brokeredMessage.body.toString();
const  result = await  processData(input);
if (result) {
await  receiver.completeMessage(brokeredMessage);
}  else  {
await  receiver.abandonMessage(brokeredMessage);
}
};
const  processError = async  (args)  =>  {

console.error(`Error from source ${args.errorSource} occurred:`,  args.error);

if (args.error.code === "MessagingEntityDisabled" ||
args.error.code === "MessagingEntityNotFound" ||
args.error.code === "UnauthorizedAccess") {

console.error("An unrecoverable error occurred. Stopping processing.",  args.error);

await  receiver.close();

}  else  if (args.error.code === "MessageLockLost") {

console.error("Message lock lost for message",  args.error);

}  else  if (args.error.code === "ServiceBusy") {

await  customDelay(1000); 

}  else  {

console.error("Error in processing message",  args);

}

};

const  subscription = receiver.subscribe({

processMessage,

processError,

});
return  receiver;

}
async  function  processData(input)  {
console.log("Processing message:",  input);
return  true;

}
function  customDelay(ms)  {
return  new  Promise((resolve)  =>  setTimeout(resolve,  ms));

}
const  connectionString = "Endpoint=sb:";
const  topic = "sam";
const  subscriptionName = "sampath";
startConsumer(connectionString,  topic,  subscriptionName)
.then((receiver)  =>  {
if (receiver) {
console.log("Consumer started successfully.");
}  else  {
console.log("Failed to start the consumer.");
}
})
.catch((err)  =>  {
console.error("An error occurred while starting the consumer:",  err);
});


的数据

输出:


相关问题