jboss 消息驱动Bean未从集群中的其他节点接收消息

yizd12fk  于 2022-11-23  发布在  其他
关注(0)|答案(1)|浏览(181)

我有一个Wildfly 23集群,其中有两个节点(节点1和节点2)运行standalone-full-ha配置文件。这两个集群节点正确引导并相互通信(据我判断)。
我的意图是从节点1发送一个主题上的JMS消息,并让节点1和节点2上的消息驱动Bean(MDB)使用此消息。
MDB代码:

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), 
        @ActivationConfigProperty(propertyName = "destinationLookup", 
            propertyValue = "java:/jms/topic/myTopic"), 
        @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1")
})
public class ClusteredEventListener implements MessageListener {

    @Override
    public void onMessage(final Message message) {
        // consume message
    }

}

消息发布者代码:

import java.io.Serializable;

import javax.annotation.Resource;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

@Startup
@ApplicationScoped
public class ClusteredEventSender {

    @Resource(lookup = "java:/jms/topic/myTopic")
    private Topic topic;

    @Resource(mappedName = "java:/ConnectionFactory")
    private TopicConnectionFactory connectionFactory;

    public void broadcast(final Serializable event) {
        try {
            try (TopicConnection connection = this.connectionFactory.createTopicConnection()) { 
                try (TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)) {
                    try (MessageProducer messageProducer = session.createPublisher(this.topic)) { 
                        final ObjectMessage message = session.createObjectMessage(event);
                        messageProducer.send(message);
                    }
                }
            }
        } catch (final JMSException e) {
            log.error("Could not broadcast event to topic: " + event, e);
        }
    }

}

来自独立. xml的代码段:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
    <server name="default">
        ...
        <jms-topic name="myTopic" entries="java:/jms/topic/myTopic"/>
        ...
    </server>
</subsystem>
  • 结果是消息仅在生成它们的节点上使用。其他节点不会接收任何消息。*
    • 实验**

我尝试使用带有持久订阅的java:jboss/exported/jms/RemoteConnectionFactory,每个节点和用户"jmsuser"使用唯一的clientIDsubscriptionName,并使用java中的主题:jboss/exported(java:jboss/exported/jms/topic/myTopic),更改/扩展MDB上的注解,如下所示:

...
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:jboss/exported/jms/topic/myTopic"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable"), 
@ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "subscription-${jboss.node.name}"), 
@ActivationConfigProperty(propertyName = "clientID", propertyValue = "node-${jboss.node.name}"), 
@ActivationConfigProperty(propertyName = "connectionFactoryLookup", propertyValue = "java:jboss/exported/jms/RemoteConnectionFactory"), 
@ActivationConfigProperty(propertyName = "user", propertyValue = "jmsuser"), 
@ActivationConfigProperty(propertyName = "password", propertyValue = "jmsuser")
...

注意:"jmsuser"是使用批处理脚本add-user.bat在wildfly/bin目录中添加的。它已被分配角色"guest"。guest角色已针对持久队列进行了扩展。注解中的属性替换(使clientIDsubscriptionName中的${jboss.node.name}正常工作)已在standalone. xml中激活:

<subsystem xmlns="urn:jboss:domain:ee:6.0">
    ...
    <annotation-property-replacement>true</annotation-property-replacement>
    ...
</subsystem>
...
<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
    <server name="default">
        ...
        <security-setting name="#">
            ...
            <role name="guest" delete-non-durable-queue="true"
               create-non-durable-queue="true"
               delete-durable-queue="true"
               create-durable-queue="true"
               consume="true"
               send="true" />
            ...
        </security-setting>
        ...
        <jms-topic name="myTopic" entries="java:jboss/exported/jms/topic/myTopic"/>
        ...
    </server>
</subsystem>
...

修改后的消息发布者代码:

public class ClusteredEventSender {

    @Resource(lookup = "java:jboss/exported/jms/topic/myTopic")
    private Topic topic;

    @Resource(lookup = "java:jboss/exported/jms/RemoteConnectionFactory")
    private TopicConnectionFactory connectionFactory;

    public void broadcast(final Serializable event) {
        try {
            try (TopicConnection connection = this.connectionFactory.createTopicConnection("jmsuser", "jmsuser")) {
                try (TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)) {
                    try (MessageProducer messageProducer = session.createPublisher(this.topic)) { 
                        final ObjectMessage message = session.createObjectMessage(event);
                        messageProducer.send(message);
                    }
                }
            }
        } catch (final JMSException e) {
            log.error("Could not broadcast event to topic: " + event, e);
        }
    }

}
  • 实验结果:与RemoteConnectionFactory的连接正常工作,但行为仍与以前相同。*
    • 我的问题是:**如何在Wildfly集群中实现使用JMS/ActiveMQ的发布/订阅?消息驱动Bean是什么样子的?如何发送消息?需要什么配置?
juzqafwq

juzqafwq1#

感谢@ehsavoie的提示,我们设法解决了这个问题。

  • 没有必要使用java:jboss/exported/jms/topic/myTopicjava:jboss/exported/jms/RemoteConnectionFactory。也没有任何@ActivationConfigProperty-subscriptionNameclientIduserpassword的条目。问题中描述的所有实验都是死胡同。
  • 创建一个从节点1到节点2的连接器(如注解中所建议的)有一些效果,因为节点1能够将事件发送到节点2。然而,由于我们使用的是JGroups发现(如在full-ha中),这只是发现工作不正常的一个线索。
  • 在更改了集群密码“CHANGE ME!!”(废话!),并在standalone.xml的<default-bindings>中添加了属性jms-connection-factory="java:jboss/DefaultJMSConnectionFactory"之后,一切都按预期运行。

因此,相关的standalone.xml片段为:

<cluster password="something else than CHANGE ME!!"/>

<default-bindings ... jms-connection-factory="java:jboss/DefaultJMSConnectionFactory" ... />

更新

为了更好地定位:上述片段的位置为:

<server xmlns="urn:jboss:domain:16.0">
    ...
    <profile>
        ...
        <subsystem xmlns="urn:jboss:domain:messaging-activemq:13.0">
            <server name="default">
                <cluster .../>
                ...
            </server>
        </subsystem>
        ...
        <subsystem xmlns="urn:jboss:domain:ee:6.0">
            <default-bindings .../>
        </subsystem>
        ...
    </profile>
    ...
</server>

相关问题