ActiveMQ(三):ActiveMQ的安全机制、api及订阅模式demo

x33g5p2x  于2021-12-19 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(381)

一、ActiveMQ安全机制

ActiveMQ是使用jetty部署的,修改密码需要到相应的配置文件 
配置文件是这个:

在其第123行添加用户名和密码,添加配置如下:

<plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="bhz" password="bhz" groups="users,admins"/>
                </users>
            </simpleAuthenticationPlugin>
        </plugins>
  •  

这时候这个需要改为这样才能进行连接:

//第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");
  •  

二、ActiveMQ的api的使用

1. Connection的使用

省略

2. Session方法的使用

一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或多个Session.Session是一个发送或接收消息的线程 
session可以被事务化,也可以不被事务化 
如果使用事务的话,那么需要commit,如下

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        //第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通过session创建消息的生产者或消费者,下面是创建生产者
        MessageProducer messageProducer = session.createProducer(desiDestination);

        //第六步:使用MessageProducer的set方法为其设置持久化特性和非持久化特性,后面再详细介绍
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//这里先设置为非持久化

        //第七步:通过JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据
        for(int i =0; i<5;i++){
            TextMessage textMessage =  session.createTextMessage("这是消息内容,id为"+i);
            messageProducer.send(textMessage);
        }
        session.commit();
        if(connection!=null){
            connection.close();
        }

    }

}
  •  

注意session.commit(); 
session的签收模式有三种情况: 
1. Session.AUTO_ACKNOWLEDGE :自动签收 
2. Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息,在这种情况下,签收发生在Session层面:签收一个已消费的消息会自动签收这个Session所有已消费消息的收条 
3. Session.DUPS_OK_ACKNOWLEDGE:此选项指示Session不必确保对传送消息的签收。它可能引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用 
上面三种模式,推荐使用的是Session.CLIENT_ACKNOWLEDGE 
它的操作如下:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
        Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);

        //第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通过session创建消息的生产者或消费者,下面是创建消费者
        MessageConsumer messageConsumer = session.createConsumer(desiDestination);

        while(true){
            TextMessage msg = (TextMessage) messageConsumer.receive();
            msg.acknowledge();
            if(msg==null) {
                break;
            }
            System.out.println("收到的内容为"+msg.getText());
        }
        if(connection!=null){
            connection.close();
        }

    }

}
  •  

使用该模式,需要手动的设置接收完毕的信号msg.acknowledge();,这可以当做告诉队列,这个消息已经接收完毕,可以销毁了。

3.MessageProducer


 
下面是改造后的代码:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
        Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);

        //第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通过session创建消息的生产者或消费者,下面是创建生产者
        MessageProducer messageProducer = session.createProducer(null);

        //第六步:使用MessageProducer的set方法为其设置持久化特性和非持久化特性,后面再详细介绍
    //  messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//这里先设置为非持久化

        //第七步:通过JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据
        for(int i =0; i<5;i++){
            //第一个参数:目的地
            //第二个参数:消息文本
            //第三个参数:接收模式
            //第四个参数:优先级
            //第五个参数:消息在消息队列保存的时间,这里指保存2分钟
            TextMessage textMessage =  session.createTextMessage("这是消息内容,id为"+i);
            messageProducer.send(desiDestination,textMessage,DeliveryMode.NON_PERSISTENT, i,1000*60*2);
        }
        session.commit();
        if(connection!=null){
            connection.close();
        }

    }

}
  •  

需要注意以下内容: 

  1. 优先级并不是严格遵循的 
  2. 消息在队列中超过保存事件后,还没有人取,那么就会自动消失

4.MessageConsumer

三、订阅模式的使用

上面及之前的文章的是p2p模式(点对点模式) 
那么发布订阅模式又是如何处理的?下面先看发布/订阅模式相关的含义: 


 
下面是demo案例 
消息生产者

package bhz.mq.pd;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Publish(){
        try {
            factory = new ActiveMQConnectionFactory("bhz",
                    "bhz","tcp://localhost:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(null);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void sendMessage() throws Exception{
        Destination destination = session.createTopic("topic");
        TextMessage textMessage = session.createTextMessage("我是内容");
        producer.send(destination, textMessage);
    }

    public static void main(String[] agrs) throws Exception{
        Publish p = new Publish();
        p.sendMessage();
    }
}
  •  

上面的主题是:topic 
当发布一条消息的时候,便会在后台topics上显示 

下面是消费者的代码,消费者订阅了topic主题

package bhz.mq.pd;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public Consumer1(){
        try {
            factory = new ActiveMQConnectionFactory("bhz",
                    "bhz","tcp://localhost:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
    public void receive() throws Exception{
        Destination destionation = session.createTopic("topic");
        consumer = session.createConsumer(destionation);
        consumer.setMessageListener(new Listener());
    }

    public static void main(String[] agrs) throws Exception{
        Consumer1 consumer1 = new Consumer1();
        consumer1.receive();
    }
    class Listener implements MessageListener{
        @Override
        public void onMessage(Message msg) {
            // TODO Auto-generated method stub
            System.out.println("这是接收的内容:"+msg);
        }

    }
}

这时候只要生产者发布消息,那么消费者就会接收到消息

相关文章