reactor.core.publisher.Mono.sequenceEqual()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(8.9k)|赞(0)|评价(0)|浏览(201)

本文整理了Java中reactor.core.publisher.Mono.sequenceEqual()方法的一些代码示例,展示了Mono.sequenceEqual()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.sequenceEqual()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:sequenceEqual

Mono.sequenceEqual介绍

[英]Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise.
[中]返回一个Mono,该Mono发出一个布尔值,通过比较每个发布者成对发出的项目来指示两个发布者序列是否相同。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
 * same by comparing the items emitted by each Publisher pairwise based on the results of a specified
 * equality function.
 *
 * @param source1
 *            the first Publisher to compare
 * @param source2
 *            the second Publisher to compare
 * @param isEqual
 *            a function used to compare items emitted by each Publisher
 * @param <T>
 *            the type of items emitted by each Publisher
 * @return a Mono that emits a Boolean value that indicates whether the two Publisher two sequences
 *         are the same according to the specified function
 */
public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2,
    BiPredicate<? super T, ? super T> isEqual) {
  return sequenceEqual(source1, source2, isEqual, Queues.SMALL_BUFFER_SIZE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
 * same by comparing the items emitted by each Publisher pairwise.
 *
 * @param source1
 *            the first Publisher to compare
 * @param source2
 *            the second Publisher to compare
 * @param <T>
 *            the type of items emitted by each Publisher
 * @return a Mono that emits a Boolean value that indicates whether the two sequences are the same
 */
public static <T> Mono<Boolean> sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2) {
  return sequenceEqual(source1, source2, equalsBiPredicate(), Queues.SMALL_BUFFER_SIZE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void equalPredicateFailure() {
  StepVerifier.create(Mono.sequenceEqual(Mono.just("one"), Mono.just("one"),
          (s1, s2) -> { throw new IllegalStateException("boom"); }))
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceEqual() throws Exception {
  boolean res = Mono.sequenceEqual(Flux.just(1, 2, 3), Flux.just(1, 2, 3))
           .block();
  assertTrue(res);
  res = Mono.sequenceEqual(Flux.just(1, 3), Flux.just(1, 2, 3))
           .block();
  assertFalse(res);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void differenceCancelsBothSources() {
  AtomicBoolean sub1 = new AtomicBoolean();
  AtomicBoolean sub2 = new AtomicBoolean();
  Flux<Integer> source1 = Flux.range(1, 5).doOnCancel(() -> sub1.set(true));
  Flux<Integer> source2 = Flux.just(1, 2, 3, 7, 8).doOnCancel(() -> sub2.set(true));
  StepVerifier.create(Mono.sequenceEqual(source1, source2))
        .expectNext(Boolean.FALSE)
        .verifyComplete();
  Assert.assertTrue("left not cancelled", sub1.get());
  Assert.assertTrue("right not cancelled", sub2.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusedCrash() {
  Flux<Integer> source = Flux.range(1, 10).map(i -> { throw new IllegalArgumentException("boom"); });
  StepVerifier.create(Mono.sequenceEqual(source, Flux.range(1, 10).hide()))
        .verifyErrorMessage("boom");
  StepVerifier.create(Mono.sequenceEqual(Flux.range(1, 10).hide(), source))
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void doubleCancelCancelsOnce() {
  AtomicReference<Subscription> sub1 = new AtomicReference<>();
  AtomicReference<Subscription> sub2 = new AtomicReference<>();
  AtomicLong cancel1 = new AtomicLong();
  AtomicLong cancel2 = new AtomicLong();
  Flux<Integer> source1 = Flux.range(1, 5)
                .doOnSubscribe(sub1::set)
                .doOnCancel(cancel1::incrementAndGet)
                .hide();
  Flux<Integer> source2 = Flux.just(1, 2, 3, 7, 8)
                .doOnSubscribe(sub2::set)
                .doOnCancel(cancel2::incrementAndGet)
                .hide();
  Mono.sequenceEqual(source1, source2)
    .subscribe(System.out::println, Throwable::printStackTrace, null,
        s -> { s.cancel(); s.cancel(); });
  Assert.assertNotNull("left not subscribed", sub1.get());
  assertThat(cancel1.get()).isEqualTo(1);
  Assert.assertNotNull("right not subscribed", sub2.get());
  assertThat(cancel2.get()).isEqualTo(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void cancelCancelsBothSourcesIncludingNever() {
  AtomicReference<Subscription> sub1 = new AtomicReference<>();
  AtomicReference<Subscription> sub2 = new AtomicReference<>();
  AtomicBoolean cancel1 = new AtomicBoolean();
  AtomicBoolean cancel2 = new AtomicBoolean();
  Flux<Integer> source1 = Flux.range(1, 5)
                .doOnSubscribe(sub1::set)
                .doOnCancel(() -> cancel1.set(true))
                .hide();
  Flux<Integer> source2 = Flux.<Integer>never()
                .doOnSubscribe(sub2::set)
                .doOnCancel(() -> cancel2.set(true));
  Mono.sequenceEqual(source1, source2)
    .subscribe(System.out::println, Throwable::printStackTrace, null,
        Subscription::cancel);
  Assert.assertNotNull("left not subscribed", sub1.get());
  Assert.assertTrue("left not cancelled", cancel1.get());
  Assert.assertNotNull("right not subscribed", sub2.get());
  Assert.assertTrue("right not cancelled", cancel2.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceErrorsLeft() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two").concatWith(Mono.error(new IllegalStateException())),
          Flux.just("one", "two", "three")))
        .verifyError(IllegalStateException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceErrorsRight() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two", "three"),
          Flux.just("one", "two").concatWith(Mono.error(new IllegalStateException()))))
        .verifyError(IllegalStateException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceLongerRight() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two", "three"),
          Flux.just("one", "two", "three", "four")))
        .expectNext(Boolean.FALSE)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void largeSequence() {
  Flux<Integer> source = Flux.range(1, Queues.SMALL_BUFFER_SIZE * 4).subscribeOn(Schedulers.elastic());
  StepVerifier.create(Mono.sequenceEqual(source, source))
        .expectNext(Boolean.TRUE)
        .expectComplete()
        .verify(Duration.ofSeconds(5));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceLongerLeft() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two", "three", "four"),
          Flux.just("one", "two", "three")))
        .expectNext(Boolean.FALSE)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceEmptyRight() {
  StepVerifier.create(Mono.sequenceEqual(
      Flux.just("one", "two", "three"),
      Flux.empty()))
        .expectNext(Boolean.FALSE)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceEquals() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two", "three"),
          Flux.just("one", "two", "three")))
        .expectNext(Boolean.TRUE)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceErrorsBothPropagatesLeftError() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two", "three", "four").concatWith(Mono.error(new IllegalArgumentException("left"))).hide(),
          Flux.just("one", "two").concatWith(Mono.error(new IllegalArgumentException("right"))).hide()))
        .verifyErrorMessage("left");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceErrorsBothPropagatesLeftErrorWithSmallRequest() {
  StepVerifier.create(Mono.sequenceEqual(
          Flux.just("one", "two", "three", "four")
            .concatWith(Mono.error(new IllegalArgumentException("left")))
            .hide(),
          Flux.just("one", "two")
            .concatWith(Mono.error(new IllegalArgumentException("right")))
            .hide(),
          Objects::equals, 1))
        .verifyErrorMessage("right");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceEmptyBoth() {
  StepVerifier.create(Mono.sequenceEqual(
      Flux.empty(),
      Flux.empty()))
        .expectNext(Boolean.TRUE)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void sequenceEmptyLeft() {
  StepVerifier.create(Mono.sequenceEqual(
      Flux.empty(),
      Flux.just("one", "two", "three")))
        .expectNext(Boolean.FALSE)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subscribeInnerOnce() {
  LongAdder innerSub1 = new LongAdder();
  LongAdder innerSub2 = new LongAdder();
  Flux<Integer> source1 = Flux.range(1, 5)
                .doOnSubscribe((t) -> innerSub1.increment());
  Flux<Integer> source2 = Flux.just(1, 2, 3, 7, 8)
                .doOnSubscribe((t) -> innerSub2.increment());
  Mono.sequenceEqual(source1, source2)
    .subscribe();
  Assert.assertEquals("left has been subscribed multiple times", 1, innerSub1.intValue());
  Assert.assertEquals("right has been subscribed multiple times", 1, innerSub2.intValue());
}

相关文章

Mono类方法