这是我第一次使用node.js测试RabbitMQ,我使用了amqplib。
首先,我运行
节点./消息/用户. js
输出如下-:
- 连接至RabbitMQ
- 通道已创建
- 正在等待消息...
其次,我运行
节点./消息/生产者. js
输出如下-:
- 连接至RabbitMQ
- 通道已创建
- 发送的消息:你好,世界!
- 与RabbitMQ的连接已关闭
从RabbitMQ管理控制台上,我观察到test_exchange,test_queue和test_key的存在,但没有任何消息相关的信息。而且消费者终端也没有记录任何接收消息的指示。它仍然显示消息"等待消息"。您能告诉我,我可能在哪里忽略了这个信息吗?
//config.js
module.exports = {
rabbitmq: {
host: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
vhost: '/',
exchange: 'test_exchange',
queue: 'test_queue',
routingKey: 'test_key'
}
}
//rabbitmq.js
const amqp = require("amqplib");
const config = require("../config/config");
class RabbitMQ {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
try {
const { host, port, username, password, vhost } = config.rabbitmq;
this.connection = await amqp.connect(
`amqp://${username}:${password}@${host}:${port}/${vhost}`
);
console.log("Connected to RabbitMQ");
return this.connection;
} catch (error) {
console.error("Error connecting to RabbitMQ", error);
}
}
async createChannel() {
try {
if (!this.connection) {
await this.connect();
}
this.channel = await this.connection.createChannel();
console.log("Channel created");
return this.channel;
} catch (error) {
console.error("Error creating channel", error);
}
}
async close() {
try {
await this.connection.close();
console.log("Connection to RabbitMQ closed");
} catch (error) {
console.error("Error closing connection to RabbitMQ", error);
}
}
}
module.exports = new RabbitMQ();
//producer.js
const rabbitmq = require('../lib/rabbitmq');
const config = require('../config/config');
async function produceMessage(message) {
try {
const channel = await rabbitmq.createChannel();
const exchange = config.rabbitmq.exchange;
const queue = config.rabbitmq.queue;
const key = config.rabbitmq.routingKey;
await channel.assertExchange(exchange, 'direct', { durable: true });
await channel.assertQueue(queue, { durable: true });
await channel.bindQueue(queue, exchange, key);
const messageBuffer = Buffer.from(message);
await channel.publish(exchange, key, messageBuffer);
console.log(`Message sent: ${message}`);
await rabbitmq.close();
} catch (error) {
console.error('Error producing message', error);
}
}
produceMessage('Hello, world!');
//consumer.js
const rabbitmq = require('../lib/rabbitmq');
const config = require('../config/config');
async function consumeMessage() {
try {
const channel = await rabbitmq.createChannel();
const exchange = config.rabbitmq.exchange;
const queue = config.rabbitmq.queue;
const key = config.rabbitmq.routingKey;
await channel.assertExchange(exchange, 'direct', { durable: true });
await channel.assertQueue(queue, { durable: true });
await channel.bindQueue(queue, exchange, key);
channel.consume(queue, (msg) => {
console.log(`Message received: ${msg.content.toString()}`);
channel.ack(msg);
}, { noAck: false });
console.log('Waiting for messages...');
} catch (error) {
console.error('Error consuming message', error);
}
}
consumeMessage();
1条答案
按热度按时间2wnc66cl1#
您的邮件发送失败,因为您在执行
publish
命令后立即关闭连接。您可以尝试通过注解producer.js
中的await rabbitmq.close();
行来执行此操作。如果你想在发送消息后关闭连接,你可以创建
confirm channel
而不是普通的通道,这样你就可以接收send
的确认。更改
rabbitmq.js
文件中的通道创建行:在
producer.js
中,关闭连接前调用waitForConfirms
函数: