本文整理了Java中io.vertx.core.eventbus.Message.replyAddress()
方法的一些代码示例,展示了Message.replyAddress()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message.replyAddress()
方法的具体详情如下:
包路径:io.vertx.core.eventbus.Message
类名称:Message
方法名:replyAddress
[英]The reply address. Can be null.
[中]回复地址。可以为空。
代码示例来源:origin: vert-x3/vertx-examples
@Override
public <T> void onMessage(Message<T> message) {
logger.info("Get a message from Vert.x: " + message.toString());
T body = message.body();
if (body != null) {
logger.info("Body of the message: " + body.toString());
if (message.replyAddress() != null) {
message.reply("Hi, Got your message: " + body.toString());
} else{
logger.info("No reply address for message. Not responding!");
}
} else {
message.reply("Hi, Got your empty message.");
}
}
代码示例来源:origin: eclipse-vertx/vert.x
@Test
public void testReplyFailureRecipientFailure() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
EventBus eb = vertx.eventBus();
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(eb);
AtomicReference<String> replyAddress = new AtomicReference<>();
CountDownLatch regLatch = new CountDownLatch(1);
eb.consumer("foo", msg -> {
replyAddress.set(msg.replyAddress());
msg.fail(0, "whatever");
}).completionHandler(onSuccess(v -> {
regLatch.countDown();
}));
awaitLatch(regLatch);
eb.send("foo", "bar", new DeliveryOptions(), ar -> {
assertTrue(ar.failed());
latch.countDown();
});
awaitLatch(latch);
assertEquals(Collections.singletonList(replyAddress.get()), metrics.getReplyFailureAddresses());
assertEquals(Collections.singletonList(ReplyFailure.RECIPIENT_FAILURE), metrics.getReplyFailures());
}
代码示例来源:origin: vert-x3/vertx-web
private void checkAddAccceptedReplyAddress(Message message) {
String replyAddress = message.replyAddress();
if (replyAddress != null) {
// This message has a reply address
// When the reply comes through we want to accept it irrespective of its address
// Since all replies are implicitly accepted if the original message was accepted
// So we cache the reply address, so we can check against it
// We also need to cache the message so we can actually call reply() on it - we need the actual message
// as the original sender could be on a different node so we need the replyDest (serverID) too otherwise
// the message won't be routed to the node.
messagesAwaitingReply.put(replyAddress, message);
// And we remove after timeout in case the reply never comes
vertx.setTimer(replyTimeout, tid -> messagesAwaitingReply.remove(replyAddress));
}
}
代码示例来源:origin: vert-x3/vertx-web
private void deliverMessage(SockJSSocket sock, String address, Message message) {
JsonObject envelope = new JsonObject().put("type", "rec").put("address", address).put("body", message.body());
if (message.replyAddress() != null) {
envelope.put("replyAddress", message.replyAddress());
}
if (message.headers() != null && !message.headers().isEmpty()) {
JsonObject headersCopy = new JsonObject();
for (String name : message.headers().names()) {
List<String> values = message.headers().getAll(name);
if (values.size() == 1) {
headersCopy.put(name, values.get(0));
} else {
headersCopy.put(name, values);
}
}
envelope.put("headers", headersCopy);
}
checkCallHook(() -> new BridgeEventImpl(BridgeEventType.RECEIVE, envelope, sock),
() -> sock.write(buffer(envelope.encode())),
() -> log.debug("outbound message rejected by bridge event handler"));
}
代码示例来源:origin: amoAHCP/vxms
/**
* Returns the reply-address to reply to the incoming message
*
* @return the reply address
*/
public String replyAddress() {
return message.replyAddress();
}
}
代码示例来源:origin: io.vertx/vertx-rx-java
/**
* The reply address. Can be null.
* @return the reply address, or null, if message was sent without a reply handler.
*/
public String replyAddress() {
String ret = delegate.replyAddress();
return ret;
}
代码示例来源:origin: vert-x3/vertx-rx
/**
* The reply address. Can be null.
* @return the reply address, or null, if message was sent without a reply handler.
*/
public String replyAddress() {
String ret = delegate.replyAddress();
return ret;
}
代码示例来源:origin: org.jboss.weld.vertx/weld-vertx-core
@Override
public String getReplyAddress() {
return message.replyAddress();
}
代码示例来源:origin: io.vertx/vertx-core
@Test
public void testReplyFailureRecipientFailure() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
EventBus eb = vertx.eventBus();
FakeEventBusMetrics metrics = FakeMetricsBase.getMetrics(eb);
AtomicReference<String> replyAddress = new AtomicReference<>();
CountDownLatch regLatch = new CountDownLatch(1);
eb.consumer("foo", msg -> {
replyAddress.set(msg.replyAddress());
msg.fail(0, "whatever");
}).completionHandler(onSuccess(v -> {
regLatch.countDown();
}));
awaitLatch(regLatch);
eb.send("foo", "bar", new DeliveryOptions(), ar -> {
assertTrue(ar.failed());
latch.countDown();
});
awaitLatch(latch);
assertEquals(Collections.singletonList(replyAddress.get()), metrics.getReplyFailureAddresses());
assertEquals(Collections.singletonList(ReplyFailure.RECIPIENT_FAILURE), metrics.getReplyFailures());
}
代码示例来源:origin: weld/weld-vertx
@Override
public String getReplyAddress() {
return message.replyAddress();
}
代码示例来源:origin: io.vertx/vertx-web
private void checkAddAccceptedReplyAddress(Message message) {
String replyAddress = message.replyAddress();
if (replyAddress != null) {
// This message has a reply address
// When the reply comes through we want to accept it irrespective of its address
// Since all replies are implicitly accepted if the original message was accepted
// So we cache the reply address, so we can check against it
// We also need to cache the message so we can actually call reply() on it - we need the actual message
// as the original sender could be on a different node so we need the replyDest (serverID) too otherwise
// the message won't be routed to the node.
messagesAwaitingReply.put(replyAddress, message);
// And we remove after timeout in case the reply never comes
vertx.setTimer(replyTimeout, tid -> messagesAwaitingReply.remove(replyAddress));
}
}
代码示例来源:origin: io.vertx/vertx-sockjs
private static void deliverMessage(SockJSSocket sock, String address, Message message) {
JsonObject envelope = new JsonObject().put("address", address).put("body", message.body());
if (message.replyAddress() != null) {
envelope.put("replyAddress", message.replyAddress());
}
sock.write(buffer(envelope.encode()));
}
代码示例来源:origin: pmlopes/yoke
private void checkAddAccceptedReplyAddress(Message message) {
String replyAddress = message.replyAddress();
if (replyAddress != null) {
// This message has a reply address
// When the reply comes through we want to accept it irrespective of its address
// Since all replies are implicitly accepted if the original message was accepted
// So we cache the reply address, so we can check against it
// We also need to cache the message so we can actually call reply() on it - we need the actual message
// as the original sender could be on a different node so we need the replyDest (serverID) too otherwise
// the message won't be routed to the node.
messagesAwaitingReply.put(replyAddress, message);
// And we remove after timeout in case the reply never comes
vertx.setTimer(replyTimeout, tid -> messagesAwaitingReply.remove(replyAddress));
}
}
代码示例来源:origin: org.swisspush/mod-metrics
private void sendStatus(String status, Message<JsonObject> message, JsonObject json) {
if (json == null) {
json = new JsonObject();
}
json.put("status", status);
if(message.replyAddress() != null) {
logger.debug("replying message with status " + status);
}
message.reply(json);
}
代码示例来源:origin: weld/weld-vertx
@Override
public boolean setReply(Object reply) {
if (message.replyAddress() == null) {
LOGGER.warn("The message was sent without a reply handler - the reply will be ignored");
}
if (this.reply != null) {
LOGGER.warn("A reply was already set - the old value is replaced");
return false;
}
this.reply = reply;
return true;
}
代码示例来源:origin: org.jboss.weld.vertx/weld-vertx-core
@Override
public boolean setReply(Object reply) {
if (message.replyAddress() == null) {
LOGGER.warn("The message was sent without a reply handler - the reply will be ignored");
}
if (this.reply != null) {
LOGGER.warn("A reply was already set - the old value is replaced");
return false;
}
this.reply = reply;
return true;
}
代码示例来源:origin: io.vertx/vertx-web
private void deliverMessage(SockJSSocket sock, String address, Message message) {
JsonObject envelope = new JsonObject().put("type", "rec").put("address", address).put("body", message.body());
if (message.replyAddress() != null) {
envelope.put("replyAddress", message.replyAddress());
}
if (message.headers() != null && !message.headers().isEmpty()) {
JsonObject headersCopy = new JsonObject();
for (String name : message.headers().names()) {
List<String> values = message.headers().getAll(name);
if (values.size() == 1) {
headersCopy.put(name, values.get(0));
} else {
headersCopy.put(name, values);
}
}
envelope.put("headers", headersCopy);
}
checkCallHook(() -> new BridgeEventImpl(BridgeEventType.RECEIVE, envelope, sock),
() -> sock.write(buffer(envelope.encode())),
() -> log.debug("outbound message rejected by bridge event handler"));
}
代码示例来源:origin: pmlopes/yoke
private void deliverMessage(SockJSSocket sock, String address, Message message) {
JsonObject envelope = new JsonObject().put("type", "rec").put("address", address).put("body", message.body());
if (message.replyAddress() != null) {
envelope.put("replyAddress", message.replyAddress());
}
if ( message.headers() != null && !message.headers().isEmpty()) {
JsonObject headersCopy = new JsonObject();
for (String name : message.headers().names()) {
List<String> values = message.headers().getAll(name);
if ( values.size() == 1) {
headersCopy.put(name, values.get(0));
} else {
headersCopy.put(name, values);
}
}
envelope.put("headers", headersCopy);
}
checkCallHook(() -> new BridgeEventImpl(BridgeEventType.RECEIVE, envelope, sock),
() -> sock.write(buffer(envelope.encode())),
() -> log.debug("outbound message rejected by bridge event handler"));
}
代码示例来源:origin: io.vertx/vertx-shell
process.write("Reply address: " + msg.replyAddress() + "\n");
MultiMap headers = msg.headers();
for (String header : headers.names()) {
代码示例来源:origin: io.vertx/vertx-shell
@Override
public void process(CommandProcess process) {
Object body = parseBody();
if (reply) {
process.vertx().eventBus().send(address, body, options, ar -> {
if (ar.succeeded()) {
Message<Object> reply = ar.result();
if (verbose) {
process.write("Reply address: " + reply.replyAddress() + "\n");
MultiMap headers = reply.headers();
for (String header : headers.names()) {
process.write("Reply header " + header + ":" + headers.getAll(header) + "\n");
}
}
process.write("Reply: <");
process.write(String.valueOf(reply.body())).write(">\n");
} else {
process.write("Error: " + ar.cause().getMessage() + "\n");
}
process.end();
});
} else {
process.vertx().eventBus().send(address, body, options);
process.end();
}
}
}
内容来源于网络,如有侵权,请联系作者删除!