io.vertx.core.eventbus.Message类的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(12.1k)|赞(0)|评价(0)|浏览(174)

本文整理了Java中io.vertx.core.eventbus.Message类的一些代码示例,展示了Message类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Message类的具体详情如下:
包路径:io.vertx.core.eventbus.Message
类名称:Message

Message介绍

[英]Represents a message that is received from the event bus in a handler.

Messages have a #body, which can be null, and also #headers, which can be empty.

If the message was sent specifying a reply handler, it can be replied to using #reply.

If you want to notify the sender that processing failed, then #fail can be called.
[中]表示从处理程序中的事件总线接收的消息。
消息有#body(可以为null)和#header(可以为空)。
如果发送消息时指定了回复处理程序,则可以使用#reply进行回复。
如果要通知发送方处理失败,则可以调用#fail。

代码示例

代码示例来源:origin: vert-x3/vertx-examples

  1. @GET
  2. @Path("/products")
  3. @Produces({MediaType.APPLICATION_JSON})
  4. public void list(
  5. // Suspend the request
  6. @Suspended final AsyncResponse asyncResponse,
  7. // Inject the Vertx instance
  8. @Context Vertx vertx) {
  9. // Send a list message to the backend
  10. vertx.eventBus().<JsonArray>send("backend", new JsonObject().put("op", "list"), msg -> {
  11. // When we get the response we resume the Jax-RS async response
  12. if (msg.succeeded()) {
  13. JsonArray json = msg.result().body();
  14. if (json != null) {
  15. asyncResponse.resume(json.encode());
  16. } else {
  17. asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
  18. }
  19. } else {
  20. asyncResponse.resume(Response.status(Response.Status.INTERNAL_SERVER_ERROR).build());
  21. }
  22. });
  23. }
  24. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testSendWithTimeoutNoTimeoutAfterReply() {
  3. String str = TestUtils.randomUnicodeString(1000);
  4. long timeout = 1000;
  5. eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  6. assertEquals(str, msg.body());
  7. msg.reply("a reply");
  8. });
  9. AtomicBoolean received = new AtomicBoolean();
  10. eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  11. assertFalse(received.get());
  12. assertTrue(ar.succeeded());
  13. received.set(true);
  14. // Now wait longer than timeout and make sure we don't receive any other reply
  15. vertx.setTimer(timeout * 2, tid -> {
  16. testComplete();
  17. });
  18. });
  19. await();
  20. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. MessageConsumer<?> reg = vertices[1].eventBus().consumer(ADDRESS1).handler(msg -> {
  2. assertEquals(str, msg.body());
  3. if (options == null) {
  4. msg.reply(val);
  5. } else {
  6. msg.reply(val, options);
  7. reg.completionHandler(ar -> {
  8. assertTrue(ar.succeeded());
  9. vertices[0].eventBus().send(ADDRESS1, str, onSuccess((Message<R> reply) -> {
  10. if (consumer == null) {
  11. assertTrue(reply.isSend());
  12. assertEquals(received, reply.body());
  13. if (options != null && options.getHeaders() != null) {
  14. assertNotNull(reply.headers());
  15. assertEquals(options.getHeaders().size(), reply.headers().size());
  16. for (Map.Entry<String, String> entry: options.getHeaders().entries()) {
  17. assertEquals(reply.headers().get(entry.getKey()), entry.getValue());
  18. consumer.accept(reply.body());

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start(Future<Void> startFuture) throws Exception {
  3. vertx.eventBus().<String>consumer("hello", message -> {
  4. message.reply("Hello " + message.body() + " from " + ID);
  5. }).completionHandler(startFuture);
  6. }
  7. }

代码示例来源:origin: vert-x3/vertx-examples

  1. private void registerHandler() {
  2. MessageConsumer<JsonObject> messageConsumer = eventBus.consumer(verticleAddress);
  3. messageConsumer.handler(message -> {
  4. JsonObject jsonMessage = message.body();
  5. System.out.println(jsonMessage.getValue("message_from_sender_verticle"));
  6. JsonObject jsonReply = new JsonObject().put("reply", "how interesting!");
  7. message.reply(jsonReply);
  8. });
  9. }
  10. }

代码示例来源:origin: vert-x3/vertx-examples

  1. private void placeOrder(Message<JsonObject> msg) {
  2. mongo.save("orders", msg.body(), save -> {
  3. // error handling
  4. if (save.failed()) {
  5. msg.fail(500, save.cause().getMessage());
  6. return;
  7. }
  8. msg.reply(new JsonObject());
  9. });
  10. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testChangesNotVisibleObject3() {
  3. Map<String, Object> map = new HashMap<>();
  4. final JsonObject obj = new JsonObject(map);
  5. eb.<JsonObject>consumer("foo").handler((Message<JsonObject> msg) -> {
  6. vertx.setTimer(1000, id -> {
  7. assertFalse(msg.body().containsKey("b"));
  8. testComplete();
  9. });
  10. });
  11. eb.send("foo", obj);
  12. map.put("b", "uhqdihuqwd");
  13. await();
  14. }

代码示例来源:origin: vert-x3/vertx-web

  1. private void testSend(String address, Object body, boolean headers) throws Exception {
  2. CountDownLatch latch = new CountDownLatch(1);
  3. client.websocket(websocketURI, ws -> {
  4. MessageConsumer<Object> consumer = vertx.eventBus().consumer(address);
  5. consumer.handler(msg -> {
  6. Object receivedBody = msg.body();
  7. assertEquals(body, receivedBody);
  8. if (headers) {
  9. checkHeaders(msg);
  10. }
  11. consumer.unregister(v -> latch.countDown());
  12. });
  13. JsonObject msg = new JsonObject().put("type", "send").put("address", address).put("body", body);
  14. ws.writeFrame(io.vertx.core.http.WebSocketFrame.textFrame(msg.encode(), true));
  15. });
  16. awaitLatch(latch);
  17. }

代码示例来源:origin: xenv/gushici

  1. private void handleRoot(RoutingContext routingContext) {
  2. JsonObject result = new JsonObject();
  3. result.put("welcome", "欢迎使用古诗词·一言");
  4. result.put("api-document", "下面为本API可用的所有类型,使用时,在链接最后面加上 .svg / .txt / .json / .png 可以获得不同格式的输出");
  5. result.put("help", "具体安装方法请访问项目首页 " + config().getString("index.url", "http://localhost/"));
  6. vertx.eventBus().<JsonArray>send(Key.GET_HELP_FROM_REDIS, null, res -> {
  7. if (res.succeeded()) {
  8. result.put("list", res.result().body());
  9. returnJsonWithCache(routingContext, result);
  10. } else {
  11. routingContext.fail(res.cause());
  12. }
  13. });
  14. }

代码示例来源:origin: vert-x3/vertx-examples

  1. private void sendMessage() {
  2. JsonObject jsonMessage = new JsonObject().put("message_from_sender_verticle", "hello consumer");
  3. eventBus.send("Consumer", jsonMessage, messageAsyncResult -> {
  4. if(messageAsyncResult.succeeded()) {
  5. JsonObject jsonReply = (JsonObject) messageAsyncResult.result().body();
  6. System.out.println("received reply: " + jsonReply.getValue("reply"));
  7. }
  8. });
  9. }
  10. }

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start(Future<Void> startFuture) throws Exception {
  3. HttpServerOptions options = new HttpServerOptions().setPort(config().getInteger("port"));
  4. vertx.createHttpServer(options).requestHandler(request -> {
  5. String name = request.getParam("name");
  6. if (name == null) {
  7. request.response().setStatusCode(400).end("Missing name");
  8. } else {
  9. vertx.eventBus().<String>send("hello", name, ar -> {
  10. if (ar.succeeded()) {
  11. request.response().end(ar.result().body());
  12. } else {
  13. request.response().setStatusCode(500).end(ar.cause().getMessage());
  14. }
  15. });
  16. }
  17. }).listen(ar -> {
  18. if (ar.succeeded()) {
  19. startFuture.complete();
  20. } else {
  21. startFuture.fail(ar.cause());
  22. }
  23. });
  24. }
  25. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. private void testIsolationGroup(String group1, String group2, int count1, int count2, List<String> isolatedClasses,
  2. String verticleID) throws Exception {
  3. Map<String, Integer> countMap = new ConcurrentHashMap<>();
  4. vertx.eventBus().<JsonObject>consumer("testcounts").handler((Message<JsonObject> msg) -> {
  5. countMap.put(msg.body().getString("deploymentID"), msg.body().getInteger("count"));
  6. });
  7. CountDownLatch latch = new CountDownLatch(1);
  8. boolean expectedSuccess = Thread.currentThread().getContextClassLoader() instanceof URLClassLoader;
  9. try {
  10. vertx.deployVerticle(verticleID, new DeploymentOptions().
  11. setIsolationGroup(group1).setIsolatedClasses(isolatedClasses), ar -> {
  12. assertTrue(ar.succeeded());
  13. deploymentID1.set(ar.result());
  14. assertEquals(0, TestVerticle.instanceCount.get());
  15. vertx.deployVerticle(verticleID,
  16. new DeploymentOptions().setIsolationGroup(group2).setIsolatedClasses(isolatedClasses), ar2 -> {
  17. assertTrue(ar2.succeeded());
  18. deploymentID2.set(ar2.result());
  19. assertEquals(0, TestVerticle.instanceCount.get());

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testSendWithTimeoutNoReply() {
  3. String str = TestUtils.randomUnicodeString(1000);
  4. eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  5. assertEquals(str, msg.body());
  6. });
  7. long timeout = 1000;
  8. long start = System.currentTimeMillis();
  9. eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  10. long now = System.currentTimeMillis();
  11. assertFalse(ar.succeeded());
  12. Throwable cause = ar.cause();
  13. assertTrue(cause instanceof ReplyException);
  14. ReplyException re = (ReplyException) cause;
  15. assertEquals(-1, re.failureCode());
  16. assertEquals(ReplyFailure.TIMEOUT, re.failureType());
  17. assertTrue(now - start >= timeout);
  18. testComplete();
  19. });
  20. await();
  21. }

代码示例来源:origin: xenv/gushici

  1. private void getHelpFromRedis(Message message) {
  2. redisClient.lrange(Key.REDIS_HELP_LIST, 0, -1, res -> {
  3. if (res.succeeded()) {
  4. JsonArray array = res.result();
  5. JsonArray newArray = array.stream()
  6. .map(text -> {
  7. String prefix = config().getString("api.url", "http://localhost/");
  8. return new JsonObject((String) text).stream()
  9. .collect(Collectors.toMap(Map.Entry::getKey,
  10. v -> prefix + v.getValue().toString().replace(":", "/")));
  11. })
  12. .collect(JsonCollector.toJsonArray());
  13. message.reply(newArray);
  14. } else {
  15. log.error("Fail to get data from Redis", res.cause());
  16. message.fail(500, res.cause().getMessage());
  17. }
  18. });
  19. }

代码示例来源:origin: vert-x3/vertx-examples

  1. private void listAlbums(Message<JsonObject> msg) {
  2. // issue a find command to mongo to fetch all documents from the "albums" collection.
  3. mongo.find("albums", new JsonObject(), lookup -> {
  4. // error handling
  5. if (lookup.failed()) {
  6. msg.fail(500, lookup.cause().getMessage());
  7. return;
  8. }
  9. // now convert the list to a JsonArray because it will be easier to encode the final object as the response.
  10. final JsonArray json = new JsonArray();
  11. for (JsonObject o : lookup.result()) {
  12. json.add(o);
  13. }
  14. msg.reply(json);
  15. });
  16. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testRegisterRemote1() {
  3. startNodes(2);
  4. String str = TestUtils.randomUnicodeString(100);
  5. vertices[0].eventBus().<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  6. assertEquals(str, msg.body());
  7. testComplete();
  8. }).completionHandler(ar -> {
  9. assertTrue(ar.succeeded());
  10. vertices[1].eventBus().send(ADDRESS1, str);
  11. });
  12. await();
  13. }

代码示例来源:origin: xenv/gushici

  1. /**
  2. * @param message example: {format: "png", categories: [shenghuo, buyi]}
  3. */
  4. private void getGushiciFromRedis(Message<JsonObject> message) {
  5. JsonArray realCategory = new JsonArray()
  6. .add("png".equals(message.body().getString("format")) ? "img" : "json")
  7. .addAll(message.body().getJsonArray("categories"));
  8. checkAndGetKey(realCategory)
  9. .compose(key -> Future.<String>future(s -> redisClient.srandmember(key, s))) // 从 set 随机返回一个对象
  10. .setHandler(res -> {
  11. if (res.succeeded()) {
  12. message.reply(res.result());
  13. } else {
  14. if (res.cause() instanceof ReplyException) {
  15. ReplyException exception = (ReplyException) res.cause();
  16. message.fail(exception.failureCode(), exception.getMessage());
  17. }
  18. message.fail(500, res.cause().getMessage());
  19. }
  20. });
  21. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testReplyToSendWithNoReplyHandler() {
  3. eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  4. msg.reply("a reply");
  5. testComplete();
  6. });
  7. eb.send(ADDRESS1, "whatever");
  8. await();
  9. }

代码示例来源:origin: eclipse-vertx/vert.x

  1. @Test
  2. public void testSendWithTimeoutNoTimeoutReply() {
  3. String str = TestUtils.randomUnicodeString(1000);
  4. eb.<String>consumer(ADDRESS1).handler((Message<String> msg) -> {
  5. assertEquals(str, msg.body());
  6. msg.reply(23);
  7. });
  8. long timeout = 1000;
  9. eb.send(ADDRESS1, str, new DeliveryOptions().setSendTimeout(timeout), (AsyncResult<Message<Integer>> ar) -> {
  10. assertTrue(ar.succeeded());
  11. assertEquals(23, (int) ar.result().body());
  12. testComplete();
  13. });
  14. await();
  15. }

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start() throws Exception {
  3. // A simple backend
  4. vertx.eventBus().<JsonObject>consumer("backend", msg -> {
  5. JsonObject json = msg.body();
  6. switch (json.getString("op", "")) {
  7. case "get": {
  8. String productID = json.getString("id");
  9. msg.reply(products.get(productID));
  10. break;
  11. }
  12. case "add": {
  13. String productID = json.getString("id");
  14. JsonObject product = json.getJsonObject("product");
  15. product.put("id", productID);
  16. msg.reply(addProduct(product));
  17. break;
  18. }
  19. case "list": {
  20. JsonArray arr = new JsonArray();
  21. products.forEach((k, v) -> arr.add(v));
  22. msg.reply(arr);
  23. break;
  24. }
  25. default: {
  26. msg.fail(0, "operation not permitted");
  27. }
  28. }
  29. });
  30. }

相关文章