org.apache.qpid.proton.message.Message.setMessageAnnotations()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中org.apache.qpid.proton.message.Message.setMessageAnnotations()方法的一些代码示例,展示了Message.setMessageAnnotations()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.setMessageAnnotations()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:setMessageAnnotations

Message.setMessageAnnotations介绍

暂无

代码示例

代码示例来源:origin: apache/activemq-artemis

  1. private void lazyCreateMessageAnnotations() {
  2. if (messageAnnotationsMap == null) {
  3. messageAnnotationsMap = new HashMap<>();
  4. message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
  5. }
  6. }

代码示例来源:origin: org.eclipse.hono/hono-core

  1. /**
  2. * Adds a value for a symbol to an AMQP 1.0 message's <em>annotations</em>.
  3. *
  4. * @param msg the message to add the symbol to.
  5. * @param key the name of the symbol to add a value for.
  6. * @param value the value to add.
  7. */
  8. public static void addAnnotation(final Message msg, final String key, final Object value) {
  9. MessageAnnotations annotations = msg.getMessageAnnotations();
  10. if (annotations == null) {
  11. annotations = new MessageAnnotations(new HashMap<>());
  12. msg.setMessageAnnotations(annotations);
  13. }
  14. annotations.getValue().put(Symbol.getSymbol(key), value);
  15. }

代码示例来源:origin: Azure/azure-event-hubs-java

  1. Message toAmqpMessage(final String partitionKey) {
  2. final Message amqpMessage = this.toAmqpMessage();
  3. final MessageAnnotations messageAnnotations = (amqpMessage.getMessageAnnotations() == null)
  4. ? new MessageAnnotations(new HashMap<>())
  5. : amqpMessage.getMessageAnnotations();
  6. messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
  7. amqpMessage.setMessageAnnotations(messageAnnotations);
  8. return amqpMessage;
  9. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Adds a value for a symbol to an AMQP 1.0 message's <em>annotations</em>.
  3. *
  4. * @param msg the message to add the symbol to.
  5. * @param key the name of the symbol to add a value for.
  6. * @param value the value to add.
  7. */
  8. public static void addAnnotation(final Message msg, final String key, final Object value) {
  9. MessageAnnotations annotations = msg.getMessageAnnotations();
  10. if (annotations == null) {
  11. annotations = new MessageAnnotations(new HashMap<>());
  12. msg.setMessageAnnotations(annotations);
  13. }
  14. annotations.getValue().put(Symbol.getSymbol(key), value);
  15. }

代码示例来源:origin: strimzi/strimzi-kafka-bridge

  1. @Override
  2. public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  3. Message message = Proton.message();
  4. message.setAddress(address);
  5. message.decode(record.value(), 0, record.value().length);
  6. // put message annotations about partition, offset and key (if not null)
  7. Map<Symbol, Object> map = new HashMap<>();
  8. map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  9. map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  10. map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  11. map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  12. MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  13. message.setMessageAnnotations(messageAnnotations);
  14. return message;
  15. }

代码示例来源:origin: EnMasseProject/enmasse

  1. private void forwardMessage(ProtonSender protonSender, ProtonReceiver protonReceiver, ProtonDelivery sourceDelivery, Message message) {
  2. MessageAnnotations annotations = message.getMessageAnnotations();
  3. if (annotations == null) {
  4. annotations = new MessageAnnotations(Collections.singletonMap(replicated, true));
  5. } else {
  6. annotations.getValue().put(replicated, true);
  7. }
  8. message.setMessageAnnotations(annotations);
  9. protonSender.send(message, protonDelivery -> {
  10. sourceDelivery.disposition(protonDelivery.getRemoteState(), protonDelivery.remotelySettled());
  11. protonReceiver.flow(protonSender.getCredit() - protonReceiver.getCredit());
  12. });
  13. }

代码示例来源:origin: strimzi/strimzi-kafka-bridge

  1. message.setMessageAnnotations(messageAnnotations);

代码示例来源:origin: strimzi/strimzi-kafka-bridge

  1. @Override
  2. public Message toMessage(String address, KafkaConsumerRecord<String, byte[]> record) {
  3. Message message = Proton.message();
  4. message.setAddress(address);
  5. // put message annotations about partition, offset and key (if not null)
  6. Map<Symbol, Object> map = new HashMap<>();
  7. map.put(Symbol.valueOf(AmqpBridge.AMQP_PARTITION_ANNOTATION), record.partition());
  8. map.put(Symbol.valueOf(AmqpBridge.AMQP_OFFSET_ANNOTATION), record.offset());
  9. map.put(Symbol.valueOf(AmqpBridge.AMQP_KEY_ANNOTATION), record.key());
  10. map.put(Symbol.valueOf(AmqpBridge.AMQP_TOPIC_ANNOTATION), record.topic());
  11. MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  12. message.setMessageAnnotations(messageAnnotations);
  13. message.setBody(new Data(new Binary(record.value())));
  14. return message;
  15. }

代码示例来源:origin: eclipse/hono

  1. final Map<Symbol, Object> annotations = new HashMap<>();
  2. annotations.put(Symbol.valueOf(MessageHelper.ANNOTATION_X_OPT_APP_CORRELATION_ID), true);
  3. message.setMessageAnnotations(new MessageAnnotations(annotations));

代码示例来源:origin: org.eclipse.hono/hono-core

  1. final Map<Symbol, Object> annotations = new HashMap<>();
  2. annotations.put(Symbol.valueOf(MessageHelper.ANNOTATION_X_OPT_APP_CORRELATION_ID), true);
  3. message.setMessageAnnotations(new MessageAnnotations(annotations));

代码示例来源:origin: Azure/azure-event-hubs-java

  1. : amqpMessage.getMessageAnnotations();
  2. messageAnnotations.getValue().put(Symbol.getSymbol(systemProperty.getKey()), systemProperty.getValue());
  3. amqpMessage.setMessageAnnotations(messageAnnotations);

代码示例来源:origin: Azure/azure-event-hubs-java

  1. batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());

代码示例来源:origin: Azure/azure-service-bus-java

  1. batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return a raw AMQP message
  3. *
  4. * @return
  5. */
  6. public Message toAmqp() {
  7. Message message = ProtonHelper.message();
  8. message.setSubject(AMQP_SUBJECT);
  9. Map<Symbol, Object> map = new HashMap<>();
  10. map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  11. map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  12. MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  13. message.setMessageAnnotations(messageAnnotations);
  14. message.setAddress(this.topic);
  15. Header header = new Header();
  16. header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  17. message.setHeader(header);
  18. // the payload could be null (or empty)
  19. if (this.payload != null)
  20. message.setBody(new Data(new Binary(this.payload.getBytes())));
  21. return message;
  22. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return a raw AMQP message
  3. *
  4. * @return
  5. */
  6. public Message toAmqp() {
  7. Message message = ProtonHelper.message();
  8. message.setSubject(AMQP_SUBJECT);
  9. Map<Symbol, Object> map = new HashMap<>();
  10. map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  11. map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  12. MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  13. message.setMessageAnnotations(messageAnnotations);
  14. message.setAddress(this.topic);
  15. Header header = new Header();
  16. header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  17. message.setHeader(header);
  18. // the payload could be null (or empty)
  19. if (this.payload != null)
  20. message.setBody(new Data(new Binary(this.payload.getBytes())));
  21. return message;
  22. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return a raw AMQP message
  3. *
  4. * @return
  5. */
  6. public Message toAmqp() {
  7. Message message = ProtonHelper.message();
  8. Map<Symbol, Object> map = new HashMap<>();
  9. map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  10. map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  11. MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  12. message.setMessageAnnotations(messageAnnotations);
  13. message.setAddress(this.topic);
  14. Header header = new Header();
  15. header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  16. message.setHeader(header);
  17. message.setDeliveryCount(this.isDup ? 1 : 0);
  18. // the payload could be null (or empty)
  19. if (this.payload != null)
  20. message.setBody(new Data(new Binary(this.payload.getBytes())));
  21. return message;
  22. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return a raw AMQP message
  3. *
  4. * @return
  5. */
  6. public Message toAmqp() {
  7. Message message = ProtonHelper.message();
  8. message.setMessageId(this.messageId);
  9. Map<Symbol, Object> map = new HashMap<>();
  10. map.put(Symbol.valueOf(AMQP_RETAIN_ANNOTATION), this.isRetain);
  11. map.put(Symbol.valueOf(AMQP_QOS_ANNOTATION), this.qos.value());
  12. MessageAnnotations messageAnnotations = new MessageAnnotations(map);
  13. message.setMessageAnnotations(messageAnnotations);
  14. message.setAddress(this.topic);
  15. Header header = new Header();
  16. header.setDurable(this.qos != MqttQoS.AT_MOST_ONCE);
  17. message.setHeader(header);
  18. message.setDeliveryCount(this.isDup ? 1 : 0);
  19. // the payload could be null (or empty)
  20. if (this.payload != null)
  21. message.setBody(new Data(new Binary(this.payload.getBytes())));
  22. return message;
  23. }

代码示例来源:origin: io.vertx/vertx-amqp-bridge

  1. protonMsg.setMessageAnnotations(ma);

代码示例来源:origin: Azure/azure-service-bus-java

  1. amqpMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));

代码示例来源:origin: io.vertx/vertx-amqp-bridge

  1. @Test
  2. public void testAMQP_to_JSON_VerifyMessageAnnotations() {
  3. Map<Symbol, Object> annotations = new HashMap<>();
  4. MessageAnnotations ma = new MessageAnnotations(annotations);
  5. String testAnnKeyNameA = "testAnnKeyA";
  6. String testAnnKeyNameB = "testAnnKeyB";
  7. Symbol testAnnKeyA = Symbol.valueOf(testAnnKeyNameA);
  8. String testAnnValueA = "testAnnValueA";
  9. Symbol testAnnKeyB = Symbol.valueOf(testAnnKeyNameB);
  10. String testAnnValueB = "testAnnValueB";
  11. annotations.put(testAnnKeyA, testAnnValueA);
  12. annotations.put(testAnnKeyB, testAnnValueB);
  13. Message protonMsg = Proton.message();
  14. protonMsg.setMessageAnnotations(ma);
  15. JsonObject jsonObject = translator.convertToJsonObject(protonMsg);
  16. assertNotNull("expected converted msg", jsonObject);
  17. assertTrue("expected message annotations element key to be present",
  18. jsonObject.containsKey(AmqpConstants.MESSAGE_ANNOTATIONS));
  19. JsonObject jsonMsgAnn = jsonObject.getJsonObject(AmqpConstants.MESSAGE_ANNOTATIONS);
  20. assertNotNull("expected message annotations element value to be non-null", jsonMsgAnn);
  21. assertTrue("expected key to be present", jsonMsgAnn.containsKey(testAnnKeyNameA));
  22. assertEquals("expected value to be equal", testAnnValueA, jsonMsgAnn.getValue(testAnnKeyNameA));
  23. assertTrue("expected key to be present", jsonMsgAnn.containsKey(testAnnKeyNameB));
  24. assertEquals("expected value to be equal", testAnnValueB, jsonMsgAnn.getValue(testAnnKeyNameB));
  25. }

相关文章