RabbitMQ02_HelloWorld模型的消息生产和消费
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
@Test
public void testSendMessage() throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的主机
connectionFactory.setHost("118.31.106.51");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列,参数1:队列名称(如果不存在则会自动创建)
//参数2:定义队列特性是否要持久化 参数3:是否独占队列 参数4:是否消费完成后自动删除队列
//参数5:附加参数
channel.queueDeclare("hello",false,false,false,null);
//发布消息
//参数1:交换机名称 参数2:队列名称 参数3:传递消息的额外参数
//参数4:消息的具体内容
channel.basicPublish("", "hello", null, "hello RabbitMQ".getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.31.106.51");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
// 消费消息 参数1:消息队列名称 参数2:开启消息的自动确认机制
// 参数3:消费时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
//从最后一个参数(body)中取出消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("body=="+new String(body));
}
});
//如果不关闭通道和连接,主函数将持续消费消息,否则只消费一次消息
//channel.close();
//connection.close();
}
package com.blu.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.31.106.51");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123456");
}
// 定义提供连接对象的方法
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 关闭通道和连接的方法
public static void closeConnectionAndChannel(Channel channel, Connection connection) {
try {
if(channel!=null) {
channel.close();
}
if(connection!=null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void testSendMessage() throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("", "hello", null, "hello RabbitMQ".getBytes());
RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("body=="+new String(body));
}
});
//RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
@Test
public void testSendMessage() throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//参数2:表示允许队列持久化 参数3:是否独占队列 参数4:是否消费完成后自动删除队列
//参数5:附加参数
channel.queueDeclare("hello",true,false,false,null);
//参数1:交换机名称 参数3:传递消息的额外参数(MessageProperties.PERSISTENT_TEXT_PLAIN表示支持消息的持久化)
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello RabbitMQ".getBytes());
RabbitMQUtils.closeConnectionAndChannel(channel, connection);
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blucoding.blog.csdn.net/article/details/109490566
内容来源于网络,如有侵权,请联系作者删除!