org.apache.qpid.proton.message.Message.getCorrelationId()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.2k)|赞(0)|评价(0)|浏览(194)

本文整理了Java中org.apache.qpid.proton.message.Message.getCorrelationId()方法的一些代码示例,展示了Message.getCorrelationId()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.getCorrelationId()方法的具体详情如下:
包路径:org.apache.qpid.proton.message.Message
类名称:Message
方法名:getCorrelationId

Message.getCorrelationId介绍

暂无

代码示例

代码示例来源:origin: org.eclipse.hono/hono-server

  1. /**
  2. * @param request the request message from which to extract the correlationId
  3. * @return The ID used to correlate the given request message. This can either be the provided correlationId
  4. * (Correlation ID Pattern) or the messageId of the request (Message ID Pattern, if no correlationId is provided).
  5. */
  6. private Object getCorrelationId(final Message request) {
  7. /* if a correlationId is provided, we use it to correlate the response -> Correlation ID Pattern */
  8. if (request.getCorrelationId() != null) {
  9. return request.getCorrelationId();
  10. } else {
  11. /* otherwise we use the message id -> Message ID Pattern */
  12. return request.getMessageId();
  13. }
  14. }
  15. }

代码示例来源:origin: org.eclipse.hono/hono-core

  1. /**
  2. * Checks if an AMQP message contains either a message ID or a correlation ID.
  3. *
  4. * @param msg The message.
  5. * @return {@code true} if the message has an ID that can be used for correlation.
  6. */
  7. protected static final boolean hasCorrelationId(final Message msg) {
  8. if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  9. LOG.trace("message has neither a message-id nor correlation-id");
  10. return false;
  11. } else {
  12. return true;
  13. }
  14. }
  15. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Checks if an AMQP message contains either a message ID or a correlation ID.
  3. *
  4. * @param msg The message.
  5. * @return {@code true} if the message has an ID that can be used for correlation.
  6. */
  7. protected static final boolean hasCorrelationId(final Message msg) {
  8. if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  9. LOG.trace("message has neither a message-id nor correlation-id");
  10. return false;
  11. } else {
  12. return true;
  13. }
  14. }
  15. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return an AMQP_CLOSE message from the raw AMQP one
  3. *
  4. * @param message raw AMQP message
  5. * @return AMQP_CLOSE message
  6. */
  7. public static AmqpCloseMessage from(Message message) {
  8. if (!message.getSubject().equals(AMQP_SUBJECT)) {
  9. throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
  10. }
  11. return new AmqpCloseMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()));
  12. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return an AMQP_LIST message from the raw AMQP one
  3. *
  4. * @param message raw AMQP message
  5. * @return AMQP_LIST message
  6. */
  7. public static AmqpListMessage from(Message message) {
  8. if (!message.getSubject().equals(AMQP_SUBJECT)) {
  9. throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
  10. }
  11. return new AmqpListMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()));
  12. }

代码示例来源:origin: org.eclipse.hono/hono-core

  1. /**
  2. * Adds a property for the correlation identifier.
  3. * <p>
  4. * The value of the property is set
  5. * <ol>
  6. * <li>to the AMQP message's correlation identifier, if not {@code null}, or<li>
  7. * <li>to the AMQP message's message identifier, if not {@code null}.</li>
  8. * </ol>
  9. *
  10. * @param message The AMQP message to retrieve the value from.
  11. * @return This message for chaining.
  12. * @throws IllegalArgumentException if the message doesn't contain a correlation id
  13. * nor a message id.
  14. */
  15. public EventBusMessage setCorrelationId(final Message message) {
  16. if (message.getCorrelationId() != null) {
  17. return setCorrelationId(message.getCorrelationId());
  18. } else if (message.getMessageId() != null) {
  19. return setCorrelationId(message.getMessageId());
  20. } else {
  21. throw new IllegalArgumentException("message does not contain message-id nor correlation-id");
  22. }
  23. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Adds a property for the correlation identifier.
  3. * <p>
  4. * The value of the property is set
  5. * <ol>
  6. * <li>to the AMQP message's correlation identifier, if not {@code null}, or<li>
  7. * <li>to the AMQP message's message identifier, if not {@code null}.</li>
  8. * </ol>
  9. *
  10. * @param message The AMQP message to retrieve the value from.
  11. * @return This message for chaining.
  12. * @throws IllegalArgumentException if the message doesn't contain a correlation id
  13. * nor a message id.
  14. */
  15. public EventBusMessage setCorrelationId(final Message message) {
  16. if (message.getCorrelationId() != null) {
  17. return setCorrelationId(message.getCorrelationId());
  18. } else if (message.getMessageId() != null) {
  19. return setCorrelationId(message.getMessageId());
  20. } else {
  21. throw new IllegalArgumentException("message does not contain message-id nor correlation-id");
  22. }
  23. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return an AMQP_UNSUBSCRIBE message from the raw AMQP one
  3. *
  4. * @param message raw AMQP message
  5. * @return AMQP_UNSUBSCRIBE message
  6. */
  7. @SuppressWarnings("unchecked")
  8. public static AmqpUnsubscribeMessage from(Message message) {
  9. if (!message.getSubject().equals(AMQP_SUBJECT)) {
  10. throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
  11. }
  12. Section section = message.getBody();
  13. if ((section != null) && (section instanceof AmqpValue)) {
  14. List<String> topics = (List<String>) ((AmqpValue) section).getValue();
  15. return new AmqpUnsubscribeMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()),
  16. topics);
  17. } else {
  18. throw new IllegalArgumentException("AMQP message wrong body type");
  19. }
  20. }

代码示例来源:origin: EnMasseProject/enmasse

  1. /**
  2. * Return an AMQP_SUBSCRIBE message from the raw AMQP one
  3. *
  4. * @param message raw AMQP message
  5. * @return AMQP_SUBSCRIBE message
  6. */
  7. @SuppressWarnings("unchecked")
  8. public static AmqpSubscribeMessage from(Message message) {
  9. if (!message.getSubject().equals(AMQP_SUBJECT)) {
  10. throw new IllegalArgumentException(String.format("AMQP message subject is no %s", AMQP_SUBJECT));
  11. }
  12. Section section = message.getBody();
  13. if ((section != null) && (section instanceof AmqpValue)) {
  14. Map<String, String> map = (Map<String, String>) ((AmqpValue) section).getValue();
  15. // build the unique topic subscriptions list
  16. List<AmqpTopicSubscription> topicSubscriptions = new ArrayList<>();
  17. for (Map.Entry<String, String> entry: map.entrySet()) {
  18. topicSubscriptions.add(new AmqpTopicSubscription(entry.getKey(), MqttQoS.valueOf(Integer.valueOf(entry.getValue()))));
  19. }
  20. return new AmqpSubscribeMessage(AmqpHelper.getClientIdFromPublishAddress((String) message.getCorrelationId()),
  21. topicSubscriptions);
  22. } else {
  23. throw new IllegalArgumentException("AMQP message wrong body type");
  24. }
  25. }

代码示例来源:origin: Azure/azure-event-hubs-java

  1. @Override
  2. public void onReceiveComplete(Delivery delivery) {
  3. final Message response = Proton.message();
  4. final int msgSize = delivery.pending();
  5. final byte[] buffer = new byte[msgSize];
  6. final int read = receiveLink.recv(buffer, 0, msgSize);
  7. response.decode(buffer, 0, read);
  8. delivery.settle();
  9. final OperationResult<Message, Exception> responseCallback = inflightRequests.remove(response.getCorrelationId());
  10. if (responseCallback != null)
  11. responseCallback.onComplete(response);
  12. }

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Checks whether a given tenant message contains all required properties.
  3. *
  4. * @param linkTarget The resource path to check the message's properties against for consistency.
  5. * @param msg The AMQP 1.0 message to perform the checks on.
  6. * @return {@code true} if the message passes all checks.
  7. */
  8. public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
  9. if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  10. LOG.trace("message has neither a message-id nor correlation-id");
  11. return false;
  12. } else if (msg.getSubject() == null) {
  13. LOG.trace("message [{}] does not contain subject", msg.getMessageId());
  14. return false;
  15. } else if (msg.getReplyTo() == null) {
  16. LOG.trace("message [{}] contains no reply-to address", msg.getMessageId());
  17. return false;
  18. } else if (msg.getBody() != null && !MessageHelper.hasDataBody(msg)) {
  19. LOG.trace("message [{}] contains no Data section payload", msg.getMessageId());
  20. return false;
  21. } else {
  22. return true;
  23. }
  24. }
  25. }

代码示例来源:origin: Azure/azure-service-bus-java

  1. String requestMessageId = (String)finalResponseMessage.getCorrelationId();
  2. if(requestMessageId != null)

代码示例来源:origin: org.eclipse.hono/hono-client

  1. final TriTuple<Handler<AsyncResult<R>>, Object, Span> handler = replyMap.remove(message.getCorrelationId());
  2. replyToAddress, message.getCorrelationId());
  3. ProtonHelper.rejected(delivery, true);
  4. } else {
  5. if (response == null) {
  6. LOG.debug("discarding malformed response lacking status code [reply-to: {}, correlation ID: {}]",
  7. replyToAddress, message.getCorrelationId());
  8. TracingHelper.logError(span, "response from peer released (no status code)");
  9. ProtonHelper.released(delivery, true);
  10. } else {
  11. LOG.debug("received response [reply-to: {}, subject: {}, correlation ID: {}, status: {}]",
  12. replyToAddress, message.getSubject(), message.getCorrelationId(), response.getStatus());
  13. addToCache(handler.two(), response);
  14. if (span != null) {

代码示例来源:origin: eclipse/hono

  1. /**
  2. * Checks whether a given credentials message contains all required properties.
  3. *
  4. * @param linkTarget The resource path to check the message's properties against for consistency.
  5. * @param msg The AMQP 1.0 message to perform the checks on.
  6. * @return {@code true} if the message passes all checks.
  7. */
  8. public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
  9. if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  10. LOG.trace("message has neither a message-id nor correlation-id");
  11. return false;
  12. } else if (!CredentialsConstants.CredentialsAction.isValid(msg.getSubject())) {
  13. LOG.trace("message [{}] does not contain valid subject property", msg.getMessageId());
  14. return false;
  15. } else if (msg.getReplyTo() == null) {
  16. LOG.trace("message [{}] has no reply-to address set", msg.getMessageId());
  17. return false;
  18. } else if (!MessageHelper.hasDataBody(msg)) {
  19. LOG.trace("message [{}] contains no Data section payload", msg.getMessageId());
  20. return false;
  21. } else {
  22. return true;
  23. }
  24. }

代码示例来源:origin: org.eclipse.hono/hono-service-base

  1. /**
  2. * Checks whether a given credentials message contains all required properties.
  3. *
  4. * @param linkTarget The resource path to check the message's properties against for consistency.
  5. * @param msg The AMQP 1.0 message to perform the checks on.
  6. * @return {@code true} if the message passes all checks.
  7. */
  8. public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
  9. if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  10. LOG.trace("message has neither a message-id nor correlation-id");
  11. return false;
  12. } else if (!CredentialsConstants.CredentialsAction.isValid(msg.getSubject())) {
  13. LOG.trace("message [{}] does not contain valid subject property", msg.getMessageId());
  14. return false;
  15. } else if (msg.getReplyTo() == null) {
  16. LOG.trace("message [{}] has no reply-to address set", msg.getMessageId());
  17. return false;
  18. } else if (!MessageHelper.hasDataBody(msg, true)) {
  19. LOG.trace("message [{}] contains no AmqpValue or Data section payload", msg.getMessageId());
  20. return false;
  21. } else {
  22. return true;
  23. }
  24. }

代码示例来源:origin: org.eclipse.hono/hono-service-base

  1. /**
  2. * Checks whether a given tenant message contains all required properties.
  3. *
  4. * @param linkTarget The resource path to check the message's properties against for consistency.
  5. * @param msg The AMQP 1.0 message to perform the checks on.
  6. * @return {@code true} if the message passes all checks.
  7. */
  8. public static boolean verify(final ResourceIdentifier linkTarget, final Message msg) {
  9. if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  10. LOG.trace("message has neither a message-id nor correlation-id");
  11. return false;
  12. } else if (msg.getSubject() == null) {
  13. LOG.trace("message [{}] does not contain subject", msg.getMessageId());
  14. return false;
  15. } else if (msg.getReplyTo() == null) {
  16. LOG.trace("message [{}] contains no reply-to address", msg.getMessageId());
  17. return false;
  18. } else if (msg.getBody() != null && !MessageHelper.hasDataBody(msg, true)) {
  19. LOG.trace("message [{}] contains no AmqpValue or Data section payload", msg.getMessageId());
  20. return false;
  21. } else {
  22. return true;
  23. }
  24. }
  25. }

代码示例来源:origin: org.eclipse.hono/hono-server

  1. } else if (msg.getMessageId() == null && msg.getCorrelationId() == null) {
  2. LOG.trace("message has neither a message-id nor correlation-id");
  3. return false;

代码示例来源:origin: eclipse/hono

  1. private Future<ProtonDelivery> doUploadCommandResponseMessage(final AmqpContext context, final Span currentSpan) {
  2. final String correlationId = Optional.ofNullable(context.getMessage().getCorrelationId())
  3. .map(id -> {
  4. if (id instanceof String) {

代码示例来源:origin: eclipse/hono

  1. if (msg.getCorrelationId() == null) {

代码示例来源:origin: Azure/azure-event-hubs-java

  1. if (amqpMessage.getReplyTo() != null)
  2. receiveProperties.put(AmqpConstants.AMQP_PROPERTY_REPLY_TO, amqpMessage.getReplyTo());
  3. if (amqpMessage.getCorrelationId() != null)
  4. receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CORRELATION_ID, amqpMessage.getCorrelationId());
  5. if (amqpMessage.getContentType() != null)
  6. receiveProperties.put(AmqpConstants.AMQP_PROPERTY_CONTENT_TYPE, amqpMessage.getContentType());

相关文章