RabbitMQ02_HelloWorld模型的消息生产和消费

x33g5p2x  于2021-12-19 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(456)

RabbitMQ02_HelloWorld模型的消息生产和消费

  • 引入RabbitMQ的相关依赖:
  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.7.2</version>
  5. </dependency>
  • 测试消息生产者:
  1. @Test
  2. public void testSendMessage() throws IOException, TimeoutException {
  3. //创建连接mq的连接工厂对象
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. //设置连接mq的主机
  6. connectionFactory.setHost("118.31.106.51");
  7. //设置端口号
  8. connectionFactory.setPort(5672);
  9. //设置连接哪个虚拟主机
  10. connectionFactory.setVirtualHost("/ems");
  11. //设置访问虚拟主机的用户名和密码
  12. connectionFactory.setUsername("ems");
  13. connectionFactory.setPassword("123456");
  14. //获取连接对象
  15. Connection connection = connectionFactory.newConnection();
  16. //获取连接中的通道
  17. Channel channel = connection.createChannel();
  18. //通道绑定对应消息队列,参数1:队列名称(如果不存在则会自动创建)
  19. //参数2:定义队列特性是否要持久化 参数3:是否独占队列 参数4:是否消费完成后自动删除队列
  20. //参数5:附加参数
  21. channel.queueDeclare("hello",false,false,false,null);
  22. //发布消息
  23. //参数1:交换机名称 参数2:队列名称 参数3:传递消息的额外参数
  24. //参数4:消息的具体内容
  25. channel.basicPublish("", "hello", null, "hello RabbitMQ".getBytes());
  26. //关闭通道和连接
  27. channel.close();
  28. connection.close();
  29. }
  • 测试消息消费者
  1. public static void main(String[] args) throws IOException, TimeoutException {
  2. ConnectionFactory connectionFactory = new ConnectionFactory();
  3. connectionFactory.setHost("118.31.106.51");
  4. connectionFactory.setPort(5672);
  5. connectionFactory.setVirtualHost("/ems");
  6. connectionFactory.setUsername("ems");
  7. connectionFactory.setPassword("123456");
  8. Connection connection = connectionFactory.newConnection();
  9. Channel channel = connection.createChannel();
  10. channel.queueDeclare("hello", false, false, false, null);
  11. // 消费消息 参数1:消息队列名称 参数2:开启消息的自动确认机制
  12. // 参数3:消费时的回调接口
  13. channel.basicConsume("hello", true, new DefaultConsumer(channel) {
  14. //从最后一个参数(body)中取出消息
  15. @Override
  16. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  17. throws IOException {
  18. System.out.println("body=="+new String(body));
  19. }
  20. });
  21. //如果不关闭通道和连接,主函数将持续消费消息,否则只消费一次消息
  22. //channel.close();
  23. //connection.close();
  24. }
  • 代码优化:封装连接工具类
  1. package com.blu.utils;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class RabbitMQUtils {
  6. private static ConnectionFactory connectionFactory;
  7. static {
  8. connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost("118.31.106.51");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setVirtualHost("/ems");
  12. connectionFactory.setUsername("ems");
  13. connectionFactory.setPassword("123456");
  14. }
  15. // 定义提供连接对象的方法
  16. public static Connection getConnection() {
  17. try {
  18. return connectionFactory.newConnection();
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. return null;
  23. }
  24. // 关闭通道和连接的方法
  25. public static void closeConnectionAndChannel(Channel channel, Connection connection) {
  26. try {
  27. if(channel!=null) {
  28. channel.close();
  29. }
  30. if(connection!=null) {
  31. connection.close();
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  • 优化消息生产者:
  1. @Test
  2. public void testSendMessage() throws IOException, TimeoutException {
  3. Connection connection = RabbitMQUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. channel.queueDeclare("hello",false,false,false,null);
  6. channel.basicPublish("", "hello", null, "hello RabbitMQ".getBytes());
  7. RabbitMQUtils.closeConnectionAndChannel(channel, connection);
  8. }
  • 优化消息消费者:
  1. public static void main(String[] args) throws IOException, TimeoutException {
  2. Connection connection = RabbitMQUtils.getConnection();
  3. Channel channel = connection.createChannel();
  4. channel.queueDeclare("hello", false, false, false, null);
  5. channel.basicConsume("hello", true, new DefaultConsumer(channel) {
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  8. throws IOException {
  9. System.out.println("body=="+new String(body));
  10. }
  11. });
  12. //RabbitMQUtils.closeConnectionAndChannel(channel, connection);
  13. }
  • 参数补充:
  1. @Test
  2. public void testSendMessage() throws IOException, TimeoutException {
  3. Connection connection = RabbitMQUtils.getConnection();
  4. Channel channel = connection.createChannel();
  5. //参数2:表示允许队列持久化 参数3:是否独占队列 参数4:是否消费完成后自动删除队列
  6. //参数5:附加参数
  7. channel.queueDeclare("hello",true,false,false,null);
  8. //参数1:交换机名称 参数3:传递消息的额外参数(MessageProperties.PERSISTENT_TEXT_PLAIN表示支持消息的持久化)
  9. channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello RabbitMQ".getBytes());
  10. RabbitMQUtils.closeConnectionAndChannel(channel, connection);
  11. }

相关文章