我有一个jms应用程序,它试图从jbosss队列中读取数据。我实现了 MessageListener
在我的课上使用 onMessage()
接收消息
public class JBossConnector implements MessageListener, AutoCloseable {}
我的方法是:
/**
* The listener method of JMS. It listens to messages from queue: 'jbossToAppia'
* If the message is of type MessageObject, then transfer that to Appia
*
* @param message JMS Message
*/
@Override
public void onMessage(Message message) {
// receive the message from jboss queue: 'jbossToAppia'
// then post it to appia
if (message instanceof ObjectMessage) {
try {
MessageObject messageObject = (MessageObject) ((ObjectMessage) message).getObject();
System.out.printf("JbossConnector: MessageObject received from JBOSS, %s\n", messageObject.getMessageType());
component.onMessageFromJboss(properties.getProperty("target.sessionID"), messageObject);
} catch (MessageFormatException exception) {
logger.error(ExceptionHandler.getFormattedException(exception));
ExceptionHandler.printException(exception);
} catch (JMSException exception) {
ExceptionHandler.printException(exception);
restart();
}
} else {
System.out.printf("%s: MessageFormatException(Message is not of the format MessageObject)\n", this.getClass().getSimpleName());
}
}
每当我找到一个 JMSException
我尝试重新启动jboss连接(context、connection、session、receiver、sender)。我怀疑的是我读过 onMessage()
使用多个线程从队列接收消息(如果我错了,请纠正我)。
当jboss队列连接断开时,至少会有一些队列抛出此异常。这意味着他们都会努力 restart()
连接是浪费时间( restart()
首先关闭所有连接,将变量设置为null,然后尝试启动连接)。
现在我可以做一些像
synchronized (this){
restart();
}
或使用 volatile
变量。但这并不能保证其他线程不会尝试 restart()
当当前线程完成 restart()
操作(如果我错了,请再次纠正我)。
有什么办法可以让这一切顺利进行吗?
1条答案
按热度按时间7qhs6swi1#
这个
onMessage()
的MessageListener
确实是从它自己的线程运行的,所以您需要适当的并发控制。我认为最简单的解决办法就是使用java.util.concurrent.atomic.AtomicBoolean
. 例如,在你的restart()
方法你可以这样做:这将使
restart()
方法有效幂等。多个线程将能够调用restart()
但是只有第一个调用它的线程才会真正导致资源被重新创建。所有其他电话将立即返回。