文章14 | 阅读 4798 | 点赞0
ActiveMQ是使用jetty部署的,修改密码需要到相应的配置文件
配置文件是这个:
在其第123行添加用户名和密码,添加配置如下:
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="bhz" password="bhz" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
这时候这个需要改为这样才能进行连接:
//第一步:建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
省略
一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或多个Session.Session是一个发送或接收消息的线程
session可以被事务化,也可以不被事务化
如果使用事务的话,那么需要commit,如下
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
public static void main(String[] agrs) throws Exception{
//第一步:建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
//第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
Connection connection = connectionFactory.createConnection();
connection.start();
//第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
Destination desiDestination = session.createQueue("queue1");
//第五步:通过session创建消息的生产者或消费者,下面是创建生产者
MessageProducer messageProducer = session.createProducer(desiDestination);
//第六步:使用MessageProducer的set方法为其设置持久化特性和非持久化特性,后面再详细介绍
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//这里先设置为非持久化
//第七步:通过JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据
for(int i =0; i<5;i++){
TextMessage textMessage = session.createTextMessage("这是消息内容,id为"+i);
messageProducer.send(textMessage);
}
session.commit();
if(connection!=null){
connection.close();
}
}
}
注意session.commit();
session的签收模式有三种情况:
1. Session.AUTO_ACKNOWLEDGE :自动签收
2. Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息,在这种情况下,签收发生在Session层面:签收一个已消费的消息会自动签收这个Session所有已消费消息的收条
3. Session.DUPS_OK_ACKNOWLEDGE:此选项指示Session不必确保对传送消息的签收。它可能引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用
上面三种模式,推荐使用的是Session.CLIENT_ACKNOWLEDGE
它的操作如下:
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
public static void main(String[] agrs) throws Exception{
//第一步:建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
//第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
Connection connection = connectionFactory.createConnection();
connection.start();
//第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
//第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
Destination desiDestination = session.createQueue("queue1");
//第五步:通过session创建消息的生产者或消费者,下面是创建消费者
MessageConsumer messageConsumer = session.createConsumer(desiDestination);
while(true){
TextMessage msg = (TextMessage) messageConsumer.receive();
msg.acknowledge();
if(msg==null) {
break;
}
System.out.println("收到的内容为"+msg.getText());
}
if(connection!=null){
connection.close();
}
}
}
使用该模式,需要手动的设置接收完毕的信号msg.acknowledge();,这可以当做告诉队列,这个消息已经接收完毕,可以销毁了。
下面是改造后的代码:
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
public static void main(String[] agrs) throws Exception{
//第一步:建立ConnectionFactory工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
//第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
Connection connection = connectionFactory.createConnection();
connection.start();
//第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);
//第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
Destination desiDestination = session.createQueue("queue1");
//第五步:通过session创建消息的生产者或消费者,下面是创建生产者
MessageProducer messageProducer = session.createProducer(null);
//第六步:使用MessageProducer的set方法为其设置持久化特性和非持久化特性,后面再详细介绍
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//这里先设置为非持久化
//第七步:通过JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据
for(int i =0; i<5;i++){
//第一个参数:目的地
//第二个参数:消息文本
//第三个参数:接收模式
//第四个参数:优先级
//第五个参数:消息在消息队列保存的时间,这里指保存2分钟
TextMessage textMessage = session.createTextMessage("这是消息内容,id为"+i);
messageProducer.send(desiDestination,textMessage,DeliveryMode.NON_PERSISTENT, i,1000*60*2);
}
session.commit();
if(connection!=null){
connection.close();
}
}
}
需要注意以下内容:
上面及之前的文章的是p2p模式(点对点模式)
那么发布订阅模式又是如何处理的?下面先看发布/订阅模式相关的含义:
下面是demo案例
消息生产者
package bhz.mq.pd;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Publish {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageProducer producer;
public Publish(){
try {
factory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendMessage() throws Exception{
Destination destination = session.createTopic("topic");
TextMessage textMessage = session.createTextMessage("我是内容");
producer.send(destination, textMessage);
}
public static void main(String[] agrs) throws Exception{
Publish p = new Publish();
p.sendMessage();
}
}
上面的主题是:topic
当发布一条消息的时候,便会在后台topics上显示
下面是消费者的代码,消费者订阅了topic主题
package bhz.mq.pd;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer1 {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageConsumer consumer;
public Consumer1(){
try {
factory = new ActiveMQConnectionFactory("bhz",
"bhz","tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void receive() throws Exception{
Destination destionation = session.createTopic("topic");
consumer = session.createConsumer(destionation);
consumer.setMessageListener(new Listener());
}
public static void main(String[] agrs) throws Exception{
Consumer1 consumer1 = new Consumer1();
consumer1.receive();
}
class Listener implements MessageListener{
@Override
public void onMessage(Message msg) {
// TODO Auto-generated method stub
System.out.println("这是接收的内容:"+msg);
}
}
}
这时候只要生产者发布消息,那么消费者就会接收到消息
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/ywl470812087/article/details/84728837
内容来源于网络,如有侵权,请联系作者删除!