io.vertx.core.eventbus.Message.replyAddress()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.5k)|赞(0)|评价(0)|浏览(187)

本文整理了Java中io.vertx.core.eventbus.Message.replyAddress()方法的一些代码示例,展示了Message.replyAddress()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.replyAddress()方法的具体详情如下:
包路径:io.vertx.core.eventbus.Message
类名称:Message
方法名:replyAddress

Message.replyAddress介绍

[英]The reply address. Can be null.
[中]回复地址。可以为空。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public <T> void onMessage(Message<T> message) {
  3. logger.info("Get a message from Vert.x: " + message.toString());
  4. T body = message.body();
  5. if (body != null) {
  6. logger.info("Body of the message: " + body.toString());
  7. if (message.replyAddress() != null) {
  8. message.reply("Hi, Got your message: " + body.toString());
  9. } else{
  10. logger.info("No reply address for message. Not responding!");
  11. }
  12. } else {
  13. message.reply("Hi, Got your empty message.");
  14. }
  15. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testReplyFailureRecipientFailure() throws Exception {
  3. CountDownLatch latch = new CountDownLatch(1);
  4. EventBus eb = vertx.eventBus();
  5. FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(eb);
  6. AtomicReference<String> replyAddress = new AtomicReference<>();
  7. CountDownLatch regLatch = new CountDownLatch(1);
  8. eb.consumer("foo", msg -> {
  9. replyAddress.set(msg.replyAddress());
  10. msg.fail(0, "whatever");
  11. }).completionHandler(onSuccess(v -> {
  12. regLatch.countDown();
  13. }));
  14. awaitLatch(regLatch);
  15. eb.send("foo", "bar", new DeliveryOptions(), ar -> {
  16. assertTrue(ar.failed());
  17. latch.countDown();
  18. });
  19. awaitLatch(latch);
  20. assertEquals(Collections.singletonList(replyAddress.get()), metrics.getReplyFailureAddresses());
  21. assertEquals(Collections.singletonList(ReplyFailure.RECIPIENT_FAILURE), metrics.getReplyFailures());
  22. }

代码示例来源:origin: vert-x3/vertx-web

  1. private void checkAddAccceptedReplyAddress(Message message) {
  2. String replyAddress = message.replyAddress();
  3. if (replyAddress != null) {
  4. // This message has a reply address
  5. // When the reply comes through we want to accept it irrespective of its address
  6. // Since all replies are implicitly accepted if the original message was accepted
  7. // So we cache the reply address, so we can check against it
  8. // We also need to cache the message so we can actually call reply() on it - we need the actual message
  9. // as the original sender could be on a different node so we need the replyDest (serverID) too otherwise
  10. // the message won't be routed to the node.
  11. messagesAwaitingReply.put(replyAddress, message);
  12. // And we remove after timeout in case the reply never comes
  13. vertx.setTimer(replyTimeout, tid -> messagesAwaitingReply.remove(replyAddress));
  14. }
  15. }

代码示例来源:origin: vert-x3/vertx-web

  1. private void deliverMessage(SockJSSocket sock, String address, Message message) {
  2. JsonObject envelope = new JsonObject().put("type", "rec").put("address", address).put("body", message.body());
  3. if (message.replyAddress() != null) {
  4. envelope.put("replyAddress", message.replyAddress());
  5. }
  6. if (message.headers() != null && !message.headers().isEmpty()) {
  7. JsonObject headersCopy = new JsonObject();
  8. for (String name : message.headers().names()) {
  9. List<String> values = message.headers().getAll(name);
  10. if (values.size() == 1) {
  11. headersCopy.put(name, values.get(0));
  12. } else {
  13. headersCopy.put(name, values);
  14. }
  15. }
  16. envelope.put("headers", headersCopy);
  17. }
  18. checkCallHook(() -> new BridgeEventImpl(BridgeEventType.RECEIVE, envelope, sock),
  19. () -> sock.write(buffer(envelope.encode())),
  20. () -> log.debug("outbound message rejected by bridge event handler"));
  21. }

代码示例来源:origin: amoAHCP/vxms

  1. /**
  2. * Returns the reply-address to reply to the incoming message
  3. *
  4. * @return the reply address
  5. */
  6. public String replyAddress() {
  7. return message.replyAddress();
  8. }
  9. }

代码示例来源:origin: io.vertx/vertx-rx-java

  1. /**
  2. * The reply address. Can be null.
  3. * @return the reply address, or null, if message was sent without a reply handler.
  4. */
  5. public String replyAddress() {
  6. String ret = delegate.replyAddress();
  7. return ret;
  8. }

代码示例来源:origin: vert-x3/vertx-rx

  1. /**
  2. * The reply address. Can be null.
  3. * @return the reply address, or null, if message was sent without a reply handler.
  4. */
  5. public String replyAddress() {
  6. String ret = delegate.replyAddress();
  7. return ret;
  8. }

代码示例来源:origin: org.jboss.weld.vertx/weld-vertx-core

  1. @Override
  2. public String getReplyAddress() {
  3. return message.replyAddress();
  4. }

代码示例来源:origin: io.vertx/vertx-core

  1. @Test
  2. public void testReplyFailureRecipientFailure() throws Exception {
  3. CountDownLatch latch = new CountDownLatch(1);
  4. EventBus eb = vertx.eventBus();
  5. FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(eb);
  6. AtomicReference<String> replyAddress = new AtomicReference<>();
  7. CountDownLatch regLatch = new CountDownLatch(1);
  8. eb.consumer("foo", msg -> {
  9. replyAddress.set(msg.replyAddress());
  10. msg.fail(0, "whatever");
  11. }).completionHandler(onSuccess(v -> {
  12. regLatch.countDown();
  13. }));
  14. awaitLatch(regLatch);
  15. eb.send("foo", "bar", new DeliveryOptions(), ar -> {
  16. assertTrue(ar.failed());
  17. latch.countDown();
  18. });
  19. awaitLatch(latch);
  20. assertEquals(Collections.singletonList(replyAddress.get()), metrics.getReplyFailureAddresses());
  21. assertEquals(Collections.singletonList(ReplyFailure.RECIPIENT_FAILURE), metrics.getReplyFailures());
  22. }

代码示例来源:origin: weld/weld-vertx

  1. @Override
  2. public String getReplyAddress() {
  3. return message.replyAddress();
  4. }

代码示例来源:origin: io.vertx/vertx-web

  1. private void checkAddAccceptedReplyAddress(Message message) {
  2. String replyAddress = message.replyAddress();
  3. if (replyAddress != null) {
  4. // This message has a reply address
  5. // When the reply comes through we want to accept it irrespective of its address
  6. // Since all replies are implicitly accepted if the original message was accepted
  7. // So we cache the reply address, so we can check against it
  8. // We also need to cache the message so we can actually call reply() on it - we need the actual message
  9. // as the original sender could be on a different node so we need the replyDest (serverID) too otherwise
  10. // the message won't be routed to the node.
  11. messagesAwaitingReply.put(replyAddress, message);
  12. // And we remove after timeout in case the reply never comes
  13. vertx.setTimer(replyTimeout, tid -> messagesAwaitingReply.remove(replyAddress));
  14. }
  15. }

代码示例来源:origin: io.vertx/vertx-sockjs

  1. private static void deliverMessage(SockJSSocket sock, String address, Message message) {
  2. JsonObject envelope = new JsonObject().put("address", address).put("body", message.body());
  3. if (message.replyAddress() != null) {
  4. envelope.put("replyAddress", message.replyAddress());
  5. }
  6. sock.write(buffer(envelope.encode()));
  7. }

代码示例来源:origin: pmlopes/yoke

  1. private void checkAddAccceptedReplyAddress(Message message) {
  2. String replyAddress = message.replyAddress();
  3. if (replyAddress != null) {
  4. // This message has a reply address
  5. // When the reply comes through we want to accept it irrespective of its address
  6. // Since all replies are implicitly accepted if the original message was accepted
  7. // So we cache the reply address, so we can check against it
  8. // We also need to cache the message so we can actually call reply() on it - we need the actual message
  9. // as the original sender could be on a different node so we need the replyDest (serverID) too otherwise
  10. // the message won't be routed to the node.
  11. messagesAwaitingReply.put(replyAddress, message);
  12. // And we remove after timeout in case the reply never comes
  13. vertx.setTimer(replyTimeout, tid -> messagesAwaitingReply.remove(replyAddress));
  14. }
  15. }

代码示例来源:origin: org.swisspush/mod-metrics

  1. private void sendStatus(String status, Message<JsonObject> message, JsonObject json) {
  2. if (json == null) {
  3. json = new JsonObject();
  4. }
  5. json.put("status", status);
  6. if(message.replyAddress() != null) {
  7. logger.debug("replying message with status " + status);
  8. }
  9. message.reply(json);
  10. }

代码示例来源:origin: weld/weld-vertx

  1. @Override
  2. public boolean setReply(Object reply) {
  3. if (message.replyAddress() == null) {
  4. LOGGER.warn("The message was sent without a reply handler - the reply will be ignored");
  5. }
  6. if (this.reply != null) {
  7. LOGGER.warn("A reply was already set - the old value is replaced");
  8. return false;
  9. }
  10. this.reply = reply;
  11. return true;
  12. }

代码示例来源:origin: org.jboss.weld.vertx/weld-vertx-core

  1. @Override
  2. public boolean setReply(Object reply) {
  3. if (message.replyAddress() == null) {
  4. LOGGER.warn("The message was sent without a reply handler - the reply will be ignored");
  5. }
  6. if (this.reply != null) {
  7. LOGGER.warn("A reply was already set - the old value is replaced");
  8. return false;
  9. }
  10. this.reply = reply;
  11. return true;
  12. }

代码示例来源:origin: io.vertx/vertx-web

  1. private void deliverMessage(SockJSSocket sock, String address, Message message) {
  2. JsonObject envelope = new JsonObject().put("type", "rec").put("address", address).put("body", message.body());
  3. if (message.replyAddress() != null) {
  4. envelope.put("replyAddress", message.replyAddress());
  5. }
  6. if (message.headers() != null && !message.headers().isEmpty()) {
  7. JsonObject headersCopy = new JsonObject();
  8. for (String name : message.headers().names()) {
  9. List<String> values = message.headers().getAll(name);
  10. if (values.size() == 1) {
  11. headersCopy.put(name, values.get(0));
  12. } else {
  13. headersCopy.put(name, values);
  14. }
  15. }
  16. envelope.put("headers", headersCopy);
  17. }
  18. checkCallHook(() -> new BridgeEventImpl(BridgeEventType.RECEIVE, envelope, sock),
  19. () -> sock.write(buffer(envelope.encode())),
  20. () -> log.debug("outbound message rejected by bridge event handler"));
  21. }

代码示例来源:origin: pmlopes/yoke

  1. private void deliverMessage(SockJSSocket sock, String address, Message message) {
  2. JsonObject envelope = new JsonObject().put("type", "rec").put("address", address).put("body", message.body());
  3. if (message.replyAddress() != null) {
  4. envelope.put("replyAddress", message.replyAddress());
  5. }
  6. if ( message.headers() != null && !message.headers().isEmpty()) {
  7. JsonObject headersCopy = new JsonObject();
  8. for (String name : message.headers().names()) {
  9. List<String> values = message.headers().getAll(name);
  10. if ( values.size() == 1) {
  11. headersCopy.put(name, values.get(0));
  12. } else {
  13. headersCopy.put(name, values);
  14. }
  15. }
  16. envelope.put("headers", headersCopy);
  17. }
  18. checkCallHook(() -> new BridgeEventImpl(BridgeEventType.RECEIVE, envelope, sock),
  19. () -> sock.write(buffer(envelope.encode())),
  20. () -> log.debug("outbound message rejected by bridge event handler"));
  21. }

代码示例来源:origin: io.vertx/vertx-shell

  1. process.write("Reply address: " + msg.replyAddress() + "\n");
  2. MultiMap headers = msg.headers();
  3. for (String header : headers.names()) {

代码示例来源:origin: io.vertx/vertx-shell

  1. @Override
  2. public void process(CommandProcess process) {
  3. Object body = parseBody();
  4. if (reply) {
  5. process.vertx().eventBus().send(address, body, options, ar -> {
  6. if (ar.succeeded()) {
  7. Message<Object> reply = ar.result();
  8. if (verbose) {
  9. process.write("Reply address: " + reply.replyAddress() + "\n");
  10. MultiMap headers = reply.headers();
  11. for (String header : headers.names()) {
  12. process.write("Reply header " + header + ":" + headers.getAll(header) + "\n");
  13. }
  14. }
  15. process.write("Reply: <");
  16. process.write(String.valueOf(reply.body())).write(">\n");
  17. } else {
  18. process.write("Error: " + ar.cause().getMessage() + "\n");
  19. }
  20. process.end();
  21. });
  22. } else {
  23. process.vertx().eventBus().send(address, body, options);
  24. process.end();
  25. }
  26. }
  27. }

相关文章