在RabbitMQ中无法接收消息的原因可能是什么?

dbf7pr2w  于 2023-02-22  发布在  RabbitMQ
关注(0)|答案(1)|浏览(549)

这是我第一次使用node.js测试RabbitMQ,我使用了amqplib。
首先,我运行
节点./消息/用户. js
输出如下-:

  • 连接至RabbitMQ
  • 通道已创建
  • 正在等待消息...

其次,我运行
节点./消息/生产者. js
输出如下-:

  • 连接至RabbitMQ
  • 通道已创建
  • 发送的消息:你好,世界!
  • 与RabbitMQ的连接已关闭

从RabbitMQ管理控制台上,我观察到test_exchange,test_queue和test_key的存在,但没有任何消息相关的信息。而且消费者终端也没有记录任何接收消息的指示。它仍然显示消息"等待消息"。您能告诉我,我可能在哪里忽略了这个信息吗?
//config.js

module.exports = {
    rabbitmq: {
      host: 'localhost',
      port: 5672,
      username: 'guest',
      password: 'guest',
      vhost: '/',
      exchange: 'test_exchange',
      queue: 'test_queue',
      routingKey: 'test_key'
    }
  }

//rabbitmq.js

const amqp = require("amqplib");
const config = require("../config/config");

class RabbitMQ {
  constructor() {
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    try {
      const { host, port, username, password, vhost } = config.rabbitmq;
      this.connection = await amqp.connect(
        `amqp://${username}:${password}@${host}:${port}/${vhost}`
      );
      console.log("Connected to RabbitMQ");
      return this.connection;
    } catch (error) {
      console.error("Error connecting to RabbitMQ", error);
    }
  }

  async createChannel() {
    try {
      if (!this.connection) {
        await this.connect();
      }
      this.channel = await this.connection.createChannel();
      console.log("Channel created");
      return this.channel;
    } catch (error) {
      console.error("Error creating channel", error);
    }
  }

  async close() {
    try {
      await this.connection.close();
      console.log("Connection to RabbitMQ closed");
    } catch (error) {
      console.error("Error closing connection to RabbitMQ", error);
    }
  }
}

module.exports = new RabbitMQ();

//producer.js

const rabbitmq = require('../lib/rabbitmq');
const config = require('../config/config');

async function produceMessage(message) {
  try {
    const channel = await rabbitmq.createChannel();
    const exchange = config.rabbitmq.exchange;
    const queue = config.rabbitmq.queue;
    const key = config.rabbitmq.routingKey;

    await channel.assertExchange(exchange, 'direct', { durable: true });
    await channel.assertQueue(queue, { durable: true });
    await channel.bindQueue(queue, exchange, key);

    const messageBuffer = Buffer.from(message);
    await channel.publish(exchange, key, messageBuffer);
    console.log(`Message sent: ${message}`);
    await rabbitmq.close();
  } catch (error) {
    console.error('Error producing message', error);
  }
}

produceMessage('Hello, world!');

//consumer.js

const rabbitmq = require('../lib/rabbitmq');
const config = require('../config/config');

async function consumeMessage() {
  try {
    const channel = await rabbitmq.createChannel();
    const exchange = config.rabbitmq.exchange;
    const queue = config.rabbitmq.queue;
    const key = config.rabbitmq.routingKey;

    await channel.assertExchange(exchange, 'direct', { durable: true });
    await channel.assertQueue(queue, { durable: true });
    await channel.bindQueue(queue, exchange, key);

    channel.consume(queue, (msg) => {
      console.log(`Message received: ${msg.content.toString()}`);
      channel.ack(msg);
    }, { noAck: false });

    console.log('Waiting for messages...');
  } catch (error) {
    console.error('Error consuming message', error);
  }
}

consumeMessage();
2wnc66cl

2wnc66cl1#

    • 问题:**

您的邮件发送失败,因为您在执行publish命令后立即关闭连接。您可以尝试通过注解producer.js中的await rabbitmq.close();行来执行此操作。

    • 解决方案:**

如果你想在发送消息后关闭连接,你可以创建confirm channel而不是普通的通道,这样你就可以接收send的确认。

    • 1.渠道创建**

更改rabbitmq.js文件中的通道创建行:

this.channel = await this.connection.createConfirmChannel();
    • 2.生产商:**

producer.js中,关闭连接前调用waitForConfirms函数:

await channel.waitForConfirms();
await rabbitmq.close();

相关问题