我正在开发一个AMQPClient类来抽象RPC调用,在第一次调用时工作得很好,但是当再次调用时,correlationId
具有第一次调用的值。
async RPC<T>(queue: string, message: string): Promise<T> {
if (!this.channel) {
throw new Error('Channel not initialized')
}
const replyTo = `${queue}.reply`
await this.channel.assertQueue(replyTo)
await this.channel.assertQueue(queue)
return new Promise<T>((resolve) => {
const correlationId = Math.random().toString(36).slice(2)
console.log('generated correlationId: ', correlationId)
const onMessage = (message: ConsumeMessage | null) => {
console.log(
correlationId,
message?.properties.correlationId,
correlationId === message?.properties.correlationId
)
if (message && message.properties.correlationId === correlationId) {
resolve(JSON.parse(message.content.toString()))
this.channel?.removeListener('message', onMessage)
}
}
this.channel?.consume(replyTo, onMessage, { noAck: true })
this.channel?.sendToQueue(queue, Buffer.from(message), {
correlationId,
replyTo
})
})
}
输出:
generated correlationId: lwfvgqym5ya
lwfvgqym5ya lwfvgqym5ya true
generated correlationId: 1m09k9jk2xm
lwfvgqym5ya 1m09k9jk2xm false
第二次打印的correlationId与已经解析的第一次调用的correlationId匹配。第二个电话是在第一个解决后打的。
我已经尝试将const correlationId = Math.random().toString(36).slice(2)
移动到new Promise(...)
之外。我还尝试传递一个anon函数给调用onMessage函数的consume回调函数,没有成功。
this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
我还尝试将correlationId作为参数传递,上面的任何一个都不行。第二次调用总是使用de onMessage函数中correlationId的最后一个值。
完整代码:
import client, { Channel, Connection, ConsumeMessage } from 'amqplib'
class AMQPClient {
private channel?: Channel
constructor(private readonly amqpUrl: string) {
client.connect(this.amqpUrl).then((connection) => {
connection.createChannel().then((channel) => {
this.channel = channel
})
process.on('SIGINT', () => this.close(connection))
process.on('SIGTERM', () => this.close(connection))
})
}
async RPC<T>(queue: string, message: string): Promise<T> {
if (!this.channel) {
throw new Error('Channel not initialized')
}
const replyTo = `${queue}.reply`
await this.channel.assertQueue(replyTo)
await this.channel.assertQueue(queue)
return new Promise<T>((resolve) => {
const correlationId = Math.random().toString(36).slice(2)
console.log('generated correlationId: ', correlationId)
const onMessage = (message: ConsumeMessage | null) => {
console.log(
correlationId,
message?.properties.correlationId,
correlationId === message?.properties.correlationId
)
if (message && message.properties.correlationId === correlationId) {
resolve(JSON.parse(message.content.toString()))
this.channel?.removeListener('message', onMessage)
}
}
this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
this.channel?.sendToQueue(queue, Buffer.from(message), {
correlationId,
replyTo
})
})
}
close(connection: Connection) {
connection.close()
process.exit(0)
}
}
const amqpClient = new AMQPClient(process.env.AMQP_URL || 'amqp://localhost')
export { amqpClient, AMQPClient }
电话:
this.amqpClient.RPC<MerchantStatus>(
'getMerchantStatus',
JSON.stringify({ merchantId: 'test' })
)
1条答案
按热度按时间7y4bm7vi1#
正如@jfriend00所指出的,onMessage回调函数没有从consume函数中删除,所以它保留了第一个correlationId的值。
解决方案是以correlationId为键,回调函数为值,创建一个HashMap。因此当队列被使用时,它会检查回调HashMap,并在消息上发送correlationId;如果它找到一个注册的回调,则调用该回调,用消息值解析promise。
工作编码:
More details关于RPC over RabbitMQ。