reactor.core.publisher.Mono.create()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(849)

本文整理了Java中reactor.core.publisher.Mono.create()方法的一些代码示例,展示了Mono.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.create()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:create

Mono.create介绍

[英]Creates a deferred emitter that can be used with callback-based APIs to signal at most one value, a complete or an error signal.

Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.

  1. addListener/removeListener pairs
    To work with such API one has to instantiate the listener, call the sink from the listener then register it with the source:
Mono.<String>create(sink -> { 
HttpListener listener = event -> { 
if (event.getResponseCode() >= 400) { 
sink.error(new RuntimeException("Failed")); 
} else { 
String body = event.getBody(); 
if (body.isEmpty()) { 
sink.success(); 
} else { 
sink.success(body.toLowerCase()); 
} 
} 
}; 
client.addListener(listener); 
sink.onDispose(() -> client.removeListener(listener)); 
});

Note that this works only with single-value emitting listeners. Otherwise, all subsequent signals are dropped. You may have to add client.removeListener(this);to the listener's body.

  1. callback handler
    This requires a similar instantiation pattern such as above, but usually the successful completion and error are separated into different methods. In addition, the legacy API may or may not support some cancellation mechanism.
Mono.<String>create(sink -> { 
Callback<String> callback = new Callback<String>() { 
@Override 
public void onResult(String data) { 
sink.success(data.toLowerCase()); 
} 
@Override 
public void onError(Exception e) { 
sink.error(e); 
} 
} 
// without cancellation support: 
client.call("query", callback); 
// with cancellation support: 
AutoCloseable cancel = client.call("query", callback); 
sink.onDispose(() -> { 
try { 
cancel.close(); 
} catch (Exception ex) { 
Exceptions.onErrorDropped(ex); 
} 
}); 
});

[中]创建一个延迟发射器,该发射器可与基于回调的API一起使用,以发出最多一个值、完整信号或错误信号。
由于缺乏标准类型和方法,桥接遗留API主要涉及样板代码。有两种API表面:1)addListener/RemovelListener和2)回调处理程序。
1) addListener/RemovelListener对
要使用此类API,必须实例化侦听器,从侦听器调用接收器,然后将其注册到源:

Mono.<String>create(sink -> { 
HttpListener listener = event -> { 
if (event.getResponseCode() >= 400) { 
sink.error(new RuntimeException("Failed")); 
} else { 
String body = event.getBody(); 
if (body.isEmpty()) { 
sink.success(); 
} else { 
sink.success(body.toLowerCase()); 
} 
} 
}; 
client.addListener(listener); 
sink.onDispose(() -> client.removeListener(listener)); 
});

请注意,这仅适用于发出单值的侦听器。否则,将丢弃所有后续信号。您可能需要添加客户端。removeListener(本);到听者的身体。
2) 回调处理程序
这需要类似于上述的实例化模式,但通常成功的完成和错误被分为不同的方法。此外,遗留API可能支持也可能不支持某些取消机制。

Mono.<String>create(sink -> { 
Callback<String> callback = new Callback<String>() { 
@Override 
public void onResult(String data) { 
sink.success(data.toLowerCase()); 
} 
@Override 
public void onError(Exception e) { 
sink.error(e); 
} 
} 
// without cancellation support: 
client.call("query", callback); 
// with cancellation support: 
AutoCloseable cancel = client.call("query", callback); 
sink.onDispose(() -> { 
try { 
cancel.close(); 
} catch (Exception ex) { 
Exceptions.onErrorDropped(ex); 
} 
}); 
});

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

private Mono<Void> writeAndFlush(Flux<DataBuffer> body, OutputStream responseBody) {
    return DataBufferUtils.write(body, responseBody).map(DataBufferUtils::release).then(Mono.create(sink -> {
      try {
        responseBody.flush();
        sink.success();
      } catch (IOException ex) {
        sink.error(ex);
      }
    }));
  }
}

代码示例来源:origin: AxonFramework/AxonFramework

/**
   * Creates the wrapper by creating the delegate mono.
   *
   * @param callback Gets the wrapper around {@link reactor.core.publisher.MonoSink} which can be used to complete the
   *                 mono
   * @param <T>      The type of data sent via this mono
   * @return instance of created wrapper
   */
  public static <T> MonoWrapper<T> create(Consumer<MonoSinkWrapper<T>> callback) {
    return new MonoWrapper<>(Mono.create(monoSink -> callback.accept(new MonoSinkWrapper<>(monoSink))));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void monoCreateOnCancel() {
  AtomicBoolean cancelled = new AtomicBoolean();
  Mono.create(s -> s.onCancel(() -> cancelled.set(true)).success("test")).block();
  assertThat(cancelled.get()).isFalse();
  Mono.create(s -> s.onCancel(() -> cancelled.set(true)).success()).block();
  assertThat(cancelled.get()).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void createStreamFromMonoCreateError() {
  AtomicInteger onDispose = new AtomicInteger();
  AtomicInteger onCancel = new AtomicInteger();
  StepVerifier.create(Mono.create(s -> {
            s.onDispose(onDispose::getAndIncrement)
             .onCancel(onCancel::getAndIncrement)
             .error(new Exception("test"));
          }))
        .verifyErrorMessage("test");
  assertThat(onDispose.get()).isEqualTo(1);
  assertThat(onCancel.get()).isEqualTo(0);
}

代码示例来源:origin: reactor/reactor-core

Mono<String> exponentialRetryScenario() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.success("hey");
    }
    else {
      s.error(new RuntimeException("test " + i));
    }
  }).retryWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: reactor/reactor-core

Flux<String> exponentialRepeatScenario2() {
  AtomicInteger i = new AtomicInteger();
  return Mono.<String>create(s -> {
    if (i.incrementAndGet() == 4) {
      s.success("hey");
    }
    else {
      s.success();
    }
  }).repeatWhen(repeat -> repeat.zipWith(Flux.range(1, 3), (t1, t2) -> t2)
                 .flatMap(time -> Mono.delay(Duration.ofSeconds(time))));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiSuccessAfterErrorIsIgnored() {
  Mono<String> secondIsValuedSuccess = Mono.create(sink -> {
    sink.error(new IllegalArgumentException("boom"));
    sink.success("bar");
  });
  StepVerifier.create(secondIsValuedSuccess)
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiEmptySuccessAfterErrorIsIgnored() {
  Mono<String> secondIsEmptySuccess = Mono.create(sink -> {
    sink.error(new IllegalArgumentException("boom"));
    sink.success();
  });
  StepVerifier.create(secondIsEmptySuccess)
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void sinkToString() {
    StepVerifier.create(Mono.create(sink -> sink.success(sink.toString())))
          .expectNext("MonoSink")
          .verifyComplete();
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiErrorAfterErrorBubblesAndDrops() {
  Mono<String> secondIsError = Mono.create(sink -> {
    sink.error(new IllegalArgumentException("boom1"));
    sink.error(new IllegalArgumentException("boom2"));
  });
StepVerifier.create(secondIsError)
        .expectErrorMessage("boom1")
        .verifyThenAssertThat()
        .hasOperatorErrorWithMessage("boom2");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTest() {
  StepVerifier.create(Mono.create(s -> s.success(s.currentContext()
                          .get(AtomicInteger.class)
                          .incrementAndGet()))
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiEmptySuccessAfterEmptySuccessIsIgnored() {
  Mono<String> secondIsEmptySuccess = Mono.create(sink -> {
    sink.success();
    sink.success();
  });
  StepVerifier.create(secondIsEmptySuccess)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiErrorAfterEmptySuccessBubblesAndDrops() {
  Mono<String> secondIsError = Mono.create(sink -> {
    sink.success();
    sink.error(new IllegalArgumentException("boom"));
  });
  StepVerifier.create(secondIsError)
        .expectComplete()
        .verifyThenAssertThat()
        .hasOperatorErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void createStreamFromMonoCreate2() {
  StepVerifier.create(Mono.create(MonoSink::success)
              .publishOn(Schedulers.parallel()))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiSuccessAfterEmptySuccessIsIgnored() {
  Mono<String> secondIsValuedSuccess = Mono.create(sink -> {
    sink.success();
    sink.success("foo");
  });
  StepVerifier.create(secondIsValuedSuccess)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiErrorAfterSuccessBubblesAndDrops() {
  Mono<String> secondIsError = Mono.create(sink -> {
    sink.success("foo");
    sink.error(new IllegalArgumentException("boom"));
  });
  StepVerifier.create(secondIsError)
        .expectNext("foo")
        .expectComplete()
        .verifyThenAssertThat()
        .hasOperatorErrorWithMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void createStreamFromMonoCreateHide() {
  StepVerifier.create(Mono.create(s -> s.success("test1")).hide())
        .expectNext("test1")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiEmptySuccessAfterSuccessIsIgnored() {
  Mono<String> secondIsEmptySuccess = Mono.create(sink -> {
    sink.success("foo");
    sink.success();
  });
  StepVerifier.create(secondIsEmptySuccess)
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sinkApiSuccessAfterSuccessIsIgnored() {
  Mono<String> secondIsValuedSuccess = Mono.create(sink -> {
    sink.success("foo");
    sink.success("bar");
  });
  StepVerifier.create(secondIsValuedSuccess)
        .expectNext("foo")
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void predicateThrows() {
  AssertSubscriber<Object> ts = AssertSubscriber.create(2);
  Mono.create(s -> s.success(1))
    .filter(v -> {
      throw new RuntimeException("forced failure");
    })
    .subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

相关文章

Mono类方法