我有不同的订阅主题,每个订阅都有一个消费者。问题是随着时间的流逝,信息的数量会越来越少.我使用了maxconcurrentcalls。但它似乎也只在开始时起作用。我使用的是Azure/service-bus npm包。以及Azure bus code sample使用用于消费消息的代码。虽然对它进行了一些更改,用于相关调用和自动完成消息。
我之前为所有消费者建立了一个单一的连接,我尝试为每个消费者建立一个单独的连接。但没有用
代码:
let receiver: any
let sbClient: ServiceBusClient
try {
sbClient = new ServiceBusClient(connectionString);
logger.info("Azure connection is establish successfully for topic:", topic)
}
catch (Err) {
logger.error("Error in subscribing events from azure service bus", Err)
return
}
try {
receiver = sbClient?.createReceiver(topic, subscriptionName);
logger.debug(`Receiver for ${topic} Connected Successfully.`)
}
catch (err) {
sbClient?.close()
logger.error(`Error in creating receiver for ${topic}`, err)
return null
}
try {
const subscription = receiver.subscribe({
processMessage: async (brokeredMessage: ServiceBusReceivedMessage) => {
var input = brokeredMessage.body
let result = await processData(input)
if (result) {
await receiver.completeMessage(brokeredMessage)
} else {
await receiver.abandonMessage(brokeredMessage)
}
},
processError: async (args: any) => {
logger.error(`Error from source ${args.errorSource} occurred: `, args.error);
// the `subscribe() call will not stop trying to receive messages without explicit intervention from you.
if (isServiceBusError(args.error)) {
switch (args.error.code) {
case "MessagingEntityDisabled":
case "MessagingEntityNotFound":
case "UnauthorizedAccess":
// It's possible you have a temporary infrastructure change (for instance, the entity being
// temporarily disabled). The handler will continue to retry if `close()` is not called on the subscription - it is completely up to you
// what is considered fatal for your program.
logger.error(
`An unrecoverable error occurred. Stopping processing. ${args.error.code}`,
args.error
);
await subscription.close();
break;
case "MessageLockLost":
logger.error(`Message lock lost for message`, args.error);
break;
case "ServiceBusy":
// choosing an arbitrary amount of time to wait.
await delay(1000);
break;
default:
logger.error("Error in processing message", args)
}
}
},
}, { autoCompleteMessages: false, maxConcurrentCalls: 100 });
return receiver
}
catch (err) {
await receiver?.close();
logger.error("Error in subscribing messages", err)
return null
}
字符串
1条答案
按热度按时间zzzyeukh1#
检查以下几点以加快此过程:
autoCompleteMessages
应该设置为false。因此,根据处理结果,避免建立未完成的消息,这可能会减慢消费者的速度,并检查您是否在处理后迅速完成消息。字符串
验证码:
型
的数据
输出:
的