javascript 什么原因导致使用第一个调用变量值的promise

pjngdqdw  于 2023-05-27  发布在  Java
关注(0)|答案(1)|浏览(68)

我正在开发一个AMQPClient类来抽象RPC调用,在第一次调用时工作得很好,但是当再次调用时,correlationId具有第一次调用的值。

async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)

    return new Promise<T>((resolve) => {
      const correlationId = Math.random().toString(36).slice(2)
      console.log('generated correlationId: ', correlationId)
      const onMessage = (message: ConsumeMessage | null) => {
        console.log(
          correlationId,
          message?.properties.correlationId,
          correlationId === message?.properties.correlationId
        )
        if (message && message.properties.correlationId === correlationId) {
          resolve(JSON.parse(message.content.toString()))
          this.channel?.removeListener('message', onMessage)
        }
      }

      this.channel?.consume(replyTo, onMessage, { noAck: true })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }

输出:

generated correlationId:  lwfvgqym5ya
lwfvgqym5ya lwfvgqym5ya true

generated correlationId:  1m09k9jk2xm
lwfvgqym5ya 1m09k9jk2xm false

第二次打印的correlationId与已经解析的第一次调用的correlationId匹配。第二个电话是在第一个解决后打的。
我已经尝试将const correlationId = Math.random().toString(36).slice(2)移动到new Promise(...)之外。我还尝试传递一个anon函数给调用onMessage函数的consume回调函数,没有成功。

this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })

我还尝试将correlationId作为参数传递,上面的任何一个都不行。第二次调用总是使用de onMessage函数中correlationId的最后一个值。
完整代码:

import client, { Channel, Connection, ConsumeMessage } from 'amqplib'

class AMQPClient {
  private channel?: Channel

  constructor(private readonly amqpUrl: string) {
    client.connect(this.amqpUrl).then((connection) => {
      connection.createChannel().then((channel) => {
        this.channel = channel
      })

      process.on('SIGINT', () => this.close(connection))
      process.on('SIGTERM', () => this.close(connection))
    })
  }

  async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)

    return new Promise<T>((resolve) => {
      const correlationId = Math.random().toString(36).slice(2)
      console.log('generated correlationId: ', correlationId)
      const onMessage = (message: ConsumeMessage | null) => {
        console.log(
          correlationId,
          message?.properties.correlationId,
          correlationId === message?.properties.correlationId
        )
        if (message && message.properties.correlationId === correlationId) {
          resolve(JSON.parse(message.content.toString()))
          this.channel?.removeListener('message', onMessage)
        }
      }

      this.channel?.consume(replyTo, (msg) => onMessage(msg), { noAck: true })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }

  close(connection: Connection) {
    connection.close()
    process.exit(0)
  }
}

const amqpClient = new AMQPClient(process.env.AMQP_URL || 'amqp://localhost')
export { amqpClient, AMQPClient }

电话:

this.amqpClient.RPC<MerchantStatus>(
  'getMerchantStatus',
  JSON.stringify({ merchantId: 'test' })
)
7y4bm7vi

7y4bm7vi1#

正如@jfriend00所指出的,onMessage回调函数没有从consume函数中删除,所以它保留了第一个correlationId的值。

解决方案是以correlationId为键,回调函数为值,创建一个HashMap。因此当队列被使用时,它会检查回调HashMap,并在消息上发送correlationId;如果它找到一个注册的回调,则调用该回调,用消息值解析promise。
工作编码:

class AMQPClient {
  private callbacks: Record<string, (message: ConsumeMessage) => void> = {}

  async RPC<T>(queue: string, message: string): Promise<T> {
    if (!this.channel) {
      throw new Error('Channel not initialized')
    }

    const replyTo = `${queue}.reply`
    await this.channel.assertQueue(replyTo)
    await this.channel.assertQueue(queue)
    const correlationId = Math.random().toString(36).slice(2)

    return new Promise((resolve) => {
      this.callbacks[correlationId] = (message) => {
        resolve(JSON.parse(message.content.toString()))
        delete this.callbacks[correlationId]
      }

      this.channel?.consume(replyTo, (message) => {
        if (message) {
          const correlationId = message.properties.correlationId
          const callback = this.callbacks[correlationId]
          if (callback) {
            callback(message)
          }
        }
      })
      this.channel?.sendToQueue(queue, Buffer.from(message), {
        correlationId,
        replyTo
      })
    })
  }
}

More details关于RPC over RabbitMQ。

相关问题