本文整理了Java中org.apache.activemq.artemis.api.core.Message.copy()
方法的一些代码示例,展示了Message.copy()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.copy()
方法的具体详情如下:
包路径:org.apache.activemq.artemis.api.core.Message
类名称:Message
方法名:copy
[英]It will generate a new instance of the message encode, being a deep copy, new properties, new everything
[中]它将生成消息编码的一个新实例,即深度副本、新属性和新的一切
代码示例来源:origin: apache/activemq-artemis
Message msg = ref.getMessage().copy();
代码示例来源:origin: apache/activemq-artemis
@Override
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
Message messageCopy = message.copy();
if (logger.isTraceEnabled()) {
logger.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
}
// TODO - we can optimise this
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
if (queueIds == null) {
// Sanity check only
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
throw new IllegalStateException("no queueIDs defined");
}
for (SimpleString propName : propNames) {
if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
messageCopy.removeProperty(propName);
}
}
messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy, forwardingAddress);
return messageCopy;
}
代码示例来源:origin: apache/activemq-artemis
originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString);
Message message = originalMessage.copy();
代码示例来源:origin: apache/activemq-artemis
private Message makeCopy(final MessageReference ref,
final boolean expiry,
final boolean copyOriginalHeaders) throws Exception {
if (ref == null) {
ActiveMQServerLogger.LOGGER.nullRefMessage();
throw new ActiveMQNullRefException("Reference to message is null");
}
Message message = ref.getMessage();
/*
We copy the message and send that to the dla/expiry queue - this is
because otherwise we may end up with a ref with the same message id in the
queue more than once which would barf - this might happen if the same message had been
expire from multiple subscriptions of a topic for example
We set headers that hold the original message address, expiry time
and original message id
*/
long newID = storageManager.generateID();
Message copy = message.copy(newID);
if (copyOriginalHeaders) {
copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
}
copy.setExpiration(0);
if (expiry) {
copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
}
copy.reencode();
return copy;
}
代码示例来源:origin: apache/activemq-artemis
Message message = messageRef.getMessage().copy();
代码示例来源:origin: apache/activemq-artemis
Message copyRedistribute = message.copy(storageManager.generateID());
copyRedistribute.setAddress(originatingQueue.getAddress());
代码示例来源:origin: apache/activemq-artemis
copy = message.copy(id);
代码示例来源:origin: apache/activemq-artemis
/**
* FIXME
* Retained messages should be handled in the core API. There is currently no support for retained messages
* at the time of writing. Instead we handle retained messages here. This method will create a new queue for
* every address that is used to store retained messages. THere should only ever be one message in the retained
* message queue. When a new subscription is created the queue should be browsed and the message copied onto
* the subscription queue for the consumer. When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue.
*/
void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
}
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
synchronized (queue) {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
iterator.remove();
queue.acknowledge(tx, ref);
}
if (!reset) {
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
}
}
}
}
代码示例来源:origin: org.apache.activemq/artemis-mqtt-protocol
/**
* FIXME
* Retained messages should be handled in the core API. There is currently no support for retained messages
* at the time of writing. Instead we handle retained messages here. This method will create a new queue for
* every address that is used to store retained messages. THere should only ever be one message in the retained
* message queue. When a new subscription is created the queue should be browsed and the message copied onto
* the subscription queue for the consumer. When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue.
*/
void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
}
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
synchronized (queue) {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
iterator.remove();
queue.acknowledge(tx, ref);
}
if (!reset) {
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
}
}
}
}
代码示例来源:origin: apache/activemq-artemis
final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);
代码示例来源:origin: apache/activemq-artemis
void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
// The address filter that matches all retained message queues.
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
// Iterate over all matching retain queues and add the queue
Transaction tx = session.getServerSession().newTransaction();
try {
synchronized (queue) {
for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
if (i.hasNext()) {
Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
sendToQueue(message, queue, tx);
}
}
}
}
} catch (Throwable t) {
tx.rollback();
throw t;
}
tx.commit();
}
代码示例来源:origin: org.apache.activemq/artemis-mqtt-protocol
void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
// The address filter that matches all retained message queues.
String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
// Iterate over all matching retain queues and add the queue
Transaction tx = session.getServerSession().newTransaction();
try {
synchronized (queue) {
for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
if (i.hasNext()) {
Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
sendToQueue(message, queue, tx);
}
}
}
}
} catch (Throwable t) {
tx.rollback();
throw t;
}
tx.commit();
}
内容来源于网络,如有侵权,请联系作者删除!