org.apache.activemq.artemis.api.core.Message.copy()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(308)

本文整理了Java中org.apache.activemq.artemis.api.core.Message.copy()方法的一些代码示例,展示了Message.copy()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.copy()方法的具体详情如下:
包路径:org.apache.activemq.artemis.api.core.Message
类名称:Message
方法名:copy

Message.copy介绍

[英]It will generate a new instance of the message encode, being a deep copy, new properties, new everything
[中]它将生成消息编码的一个新实例,即深度副本、新属性和新的一切

代码示例

代码示例来源:origin: apache/activemq-artemis

  1. Message msg = ref.getMessage().copy();

代码示例来源:origin: apache/activemq-artemis

  1. @Override
  2. protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
  3. // We make a copy of the message, then we strip out the unwanted routing id headers and leave
  4. // only
  5. // the one pertinent for the address node - this is important since different queues on different
  6. // nodes could have same queue ids
  7. // Note we must copy since same message may get routed to other nodes which require different headers
  8. Message messageCopy = message.copy();
  9. if (logger.isTraceEnabled()) {
  10. logger.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
  11. }
  12. // TODO - we can optimise this
  13. Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
  14. byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
  15. if (queueIds == null) {
  16. // Sanity check only
  17. ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
  18. throw new IllegalStateException("no queueIDs defined");
  19. }
  20. for (SimpleString propName : propNames) {
  21. if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
  22. messageCopy.removeProperty(propName);
  23. }
  24. }
  25. messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);
  26. messageCopy = super.beforeForward(messageCopy, forwardingAddress);
  27. return messageCopy;
  28. }

代码示例来源:origin: apache/activemq-artemis

  1. originalMessage.putStringProperty(Message.HDR_ROUTING_TYPE.toString(), routingTypeString);
  2. Message message = originalMessage.copy();

代码示例来源:origin: apache/activemq-artemis

  1. private Message makeCopy(final MessageReference ref,
  2. final boolean expiry,
  3. final boolean copyOriginalHeaders) throws Exception {
  4. if (ref == null) {
  5. ActiveMQServerLogger.LOGGER.nullRefMessage();
  6. throw new ActiveMQNullRefException("Reference to message is null");
  7. }
  8. Message message = ref.getMessage();
  9. /*
  10. We copy the message and send that to the dla/expiry queue - this is
  11. because otherwise we may end up with a ref with the same message id in the
  12. queue more than once which would barf - this might happen if the same message had been
  13. expire from multiple subscriptions of a topic for example
  14. We set headers that hold the original message address, expiry time
  15. and original message id
  16. */
  17. long newID = storageManager.generateID();
  18. Message copy = message.copy(newID);
  19. if (copyOriginalHeaders) {
  20. copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());
  21. }
  22. copy.setExpiration(0);
  23. if (expiry) {
  24. copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
  25. }
  26. copy.reencode();
  27. return copy;
  28. }

代码示例来源:origin: apache/activemq-artemis

  1. Message message = messageRef.getMessage().copy();

代码示例来源:origin: apache/activemq-artemis

  1. Message copyRedistribute = message.copy(storageManager.generateID());
  2. copyRedistribute.setAddress(originatingQueue.getAddress());

代码示例来源:origin: apache/activemq-artemis

  1. copy = message.copy(id);

代码示例来源:origin: apache/activemq-artemis

  1. /**
  2. * FIXME
  3. * Retained messages should be handled in the core API. There is currently no support for retained messages
  4. * at the time of writing. Instead we handle retained messages here. This method will create a new queue for
  5. * every address that is used to store retained messages. THere should only ever be one message in the retained
  6. * message queue. When a new subscription is created the queue should be browsed and the message copied onto
  7. * the subscription queue for the consumer. When a new retained message is received the message will be sent to
  8. * the retained queue and the previous retain message consumed to remove it from the queue.
  9. */
  10. void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
  11. SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
  12. Queue queue = session.getServer().locateQueue(retainAddress);
  13. if (queue == null) {
  14. queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
  15. }
  16. try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
  17. synchronized (queue) {
  18. if (iterator.hasNext()) {
  19. MessageReference ref = iterator.next();
  20. iterator.remove();
  21. queue.acknowledge(tx, ref);
  22. }
  23. if (!reset) {
  24. sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
  25. }
  26. }
  27. }
  28. }

代码示例来源:origin: org.apache.activemq/artemis-mqtt-protocol

  1. /**
  2. * FIXME
  3. * Retained messages should be handled in the core API. There is currently no support for retained messages
  4. * at the time of writing. Instead we handle retained messages here. This method will create a new queue for
  5. * every address that is used to store retained messages. THere should only ever be one message in the retained
  6. * message queue. When a new subscription is created the queue should be browsed and the message copied onto
  7. * the subscription queue for the consumer. When a new retained message is received the message will be sent to
  8. * the retained queue and the previous retain message consumed to remove it from the queue.
  9. */
  10. void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
  11. SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
  12. Queue queue = session.getServer().locateQueue(retainAddress);
  13. if (queue == null) {
  14. queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
  15. }
  16. try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
  17. synchronized (queue) {
  18. if (iterator.hasNext()) {
  19. MessageReference ref = iterator.next();
  20. iterator.remove();
  21. queue.acknowledge(tx, ref);
  22. }
  23. if (!reset) {
  24. sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
  25. }
  26. }
  27. }
  28. }

代码示例来源:origin: apache/activemq-artemis

  1. final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
  2. final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
  3. coreMsg.setAddress(address);

代码示例来源:origin: apache/activemq-artemis

  1. void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
  2. // The address filter that matches all retained message queues.
  3. String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
  4. BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
  5. // Iterate over all matching retain queues and add the queue
  6. Transaction tx = session.getServerSession().newTransaction();
  7. try {
  8. synchronized (queue) {
  9. for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
  10. Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
  11. try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
  12. if (i.hasNext()) {
  13. Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
  14. sendToQueue(message, queue, tx);
  15. }
  16. }
  17. }
  18. }
  19. } catch (Throwable t) {
  20. tx.rollback();
  21. throw t;
  22. }
  23. tx.commit();
  24. }

代码示例来源:origin: org.apache.activemq/artemis-mqtt-protocol

  1. void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
  2. // The address filter that matches all retained message queues.
  3. String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration());
  4. BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
  5. // Iterate over all matching retain queues and add the queue
  6. Transaction tx = session.getServerSession().newTransaction();
  7. try {
  8. synchronized (queue) {
  9. for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
  10. Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
  11. try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
  12. if (i.hasNext()) {
  13. Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
  14. sendToQueue(message, queue, tx);
  15. }
  16. }
  17. }
  18. }
  19. } catch (Throwable t) {
  20. tx.rollback();
  21. throw t;
  22. }
  23. tx.commit();
  24. }

相关文章