reactor.core.publisher.Operators.emptySubscription()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(168)

本文整理了Java中reactor.core.publisher.Operators.emptySubscription()方法的一些代码示例,展示了Operators.emptySubscription()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.emptySubscription()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:emptySubscription

Operators.emptySubscription介绍

[英]A singleton enumeration that represents a no-op Subscription instance that can be freely given out to clients.

The enum also implements Fuseable.QueueSubscription so operators expecting a QueueSubscription from a Fuseable source don't have to double-check their Subscription received in onSubscribe.
[中]一个单例枚举,表示可以免费提供给客户端的无操作订阅实例。
enum还实现了Fusable。QueueSubscription,因此希望从可融合源获得QueueSubscription的操作员不必再次检查他们在OnSubscripte中收到的订阅。

代码示例

代码示例来源:origin: reactor/reactor-core

void handleTimeout() {
    if (other == null) {
      super.cancel();
      actual.onError(new TimeoutException("Did not observe any item or terminal signal within "
          + timeoutDescription + " (and no fallback has been configured)"));
    }
    else {
      set(Operators.emptySubscription());
      other.subscribe(new TimeoutOtherSubscriber<>(actual, this));
    }
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanProcessor() {
  MonoProcessor<String> test = MonoProcessor.create();
  Subscription subscription = Operators.emptySubscription();
  test.onSubscribe(subscription);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);
  assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
  test.onComplete();
  assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanProcessorCancelled() {
  MonoProcessor<String> test = MonoProcessor.create();
  Subscription subscription = Operators.emptySubscription();
  test.onSubscribe(subscription);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE);
  assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
  test.cancel();
  assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanDeferredSubscription() {
  DeferredSubscription test = new DeferredSubscription();
  test.s = Operators.emptySubscription();
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(test.s);
  test.requested = 123;
  assertThat(test.scan(Scannable.Attr.REQUESTED_FROM_DOWNSTREAM)).isEqualTo(123);
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
  test.cancel();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOverlappingSubscriberCancelled() {
  CoreSubscriber<? super List>
      actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  FluxBuffer.BufferOverlappingSubscriber<String, List<String>> test = new FluxBuffer.BufferOverlappingSubscriber<>(
      actual, 23, 5, ArrayList::new);
  Subscription parent = Operators.emptySubscription();
  test.onSubscribe(parent);
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
  test.cancel();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void ignoreDoubleOnSubscribeInner() {
  StepVerifier.create(Flux.just(1).hide()
              .flatMap(f -> Flux.from(s -> {
                s.onSubscribe(Operators.emptySubscription());
                s.onSubscribe(Operators.emptySubscription());
                s.onComplete();
              })))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriberCancelled() {
  CoreSubscriber<String> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  FluxDefaultIfEmpty.DefaultIfEmptySubscriber<String> test =
      new FluxDefaultIfEmpty.DefaultIfEmptySubscriber<>(actual, "bar");
  Subscription parent = Operators.emptySubscription();
  test.onSubscribe(parent);
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
  test.cancel();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardTryOnNextPredicateMiss() {
  List<Object> discarded = new ArrayList<>();
  CoreSubscriber<Integer> actual = new AssertSubscriber<>(Operators.enableOnDiscard(null, discarded::add));
  FluxFilter.FilterSubscriber<Integer> subscriber =
      new FluxFilter.FilterSubscriber<>(actual, i -> i % 2 == 0);
  subscriber.onSubscribe(Operators.emptySubscription());
  subscriber.tryOnNext(1);
  subscriber.tryOnNext(2);
  assertThat(discarded).containsExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriber() {
  CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  FluxTake.TakeSubscriber<Integer> test = new FluxTake.TakeSubscriber<>(actual, 5);
  Subscription parent = Operators.emptySubscription();
  test.onSubscribe(parent);
  Assertions.assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  Assertions.assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
  Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse();
  test.onComplete();
  Assertions.assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardTryOnNextPredicateMiss() {
  List<Object> discarded = new ArrayList<>();
  CoreSubscriber<Integer> actual = new AssertSubscriber<>(
      Context.of(Hooks.KEY_ON_DISCARD, (Consumer<?>) discarded::add));
  FilterSubscriber<Integer> subscriber =
      new FilterSubscriber<>(actual, i -> i % 2 == 0);
  subscriber.onSubscribe(Operators.emptySubscription());
  subscriber.tryOnNext(1);
  subscriber.tryOnNext(2);
  assertThat(discarded).containsExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMain() {
  Subscription s = Operators.emptySubscription();
  test.onSubscribe(s);
  assertThat(test.scan(Scannable.Attr.PARENT)).describedAs("PARENT").isSameAs(s);
  assertThat(test.scan(Scannable.Attr.TERMINATED)).describedAs("TERMINATED").isFalse();
  assertThat(test.scan(Scannable.Attr.CANCELLED)).describedAs("CANCELLED").isFalse();
  assertThat(test.scan(Scannable.Attr.ERROR)).describedAs("ERROR").isNull();
  assertThat(test.scan(Scannable.Attr.PREFETCH)).describedAs("PREFETCH").isEqualTo(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void failNextIfTerminatedTake() {
  Hooks.onNextDropped(t -> assertThat(t).isEqualTo(1));
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onNext(1);
  })
              .take(2))
        .verifyComplete();
  Hooks.resetOnNextDropped();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriber() {
  CoreSubscriber<String> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  FluxHide.HideSubscriber<String> test = new FluxHide.HideSubscriber<>(actual);
  Subscription parent = Operators.emptySubscription();
  test.onSubscribe(parent);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriber() {
  CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  AssemblySnapshot snapshot = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
  FluxOnAssembly.OnAssemblySubscriber<Integer> test =
      new FluxOnAssembly.OnAssemblySubscriber<>(actual, snapshot, Flux.just(1));
  Subscription parent = Operators.emptySubscription();
  test.onSubscribe(parent);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanSubscriber() {
  CoreSubscriber<String> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
  MonoIgnoreElements.IgnoreElementsSubscriber<String> test = new
      MonoIgnoreElements.IgnoreElementsSubscriber<>(actual);
  Subscription sub = Operators.emptySubscription();
  test.onSubscribe(sub);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
  assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void ignoreDoubleComplete() {
  StepVerifier.create(Flux.from(s -> {
    s.onSubscribe(Operators.emptySubscription());
    s.onComplete();
    s.onComplete();
  }).flatMap(Flux::just))
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void scanInner() {
    CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, sub -> sub.request(100));
    FluxRefCount<Integer> main = new FluxRefCount<Integer>(Flux.just(10).publish(), 17);
    FluxRefCount.RefCountInner<Integer> test = new FluxRefCount.RefCountInner<Integer>(actual, new FluxRefCount.RefCountMonitor<>(main));
    Subscription sub = Operators.emptySubscription();
    test.onSubscribe(sub);

    assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
    assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanCancelFuseableMulticaster() {
  CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  FluxPublishMulticast.FluxPublishMulticaster<Integer> parent =
      new FluxPublishMulticast.FluxPublishMulticaster<>(123, Queues.<Integer>unbounded(), Context.empty());
  FluxPublishMulticast.CancelFuseableMulticaster<Integer> test =
      new FluxPublishMulticast.CancelFuseableMulticaster<>(actual, parent);
  Subscription sub = Operators.emptySubscription();
  test.onSubscribe(sub);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
  assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanCancelInner() {
  CoreSubscriber<? super Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
  UsingWhenSubscriber<Integer, String> up = new UsingWhenSubscriber<>(actual, "RESOURCE", Mono::just, Mono::just, Mono::just, null);
  final Subscription parent = Operators.emptySubscription();
  up.onSubscribe(parent);
  FluxUsingWhen.CancelInner op = new FluxUsingWhen.CancelInner(up);
  assertThat(op.scan(Attr.PARENT)).as("PARENT").isSameAs(up);
  assertThat(op.scan(Attr.ACTUAL)).as("ACTUAL").isSameAs(up.actual);
  assertThat(op.scanUnsafe(Attr.PREFETCH)).as("PREFETCH not supported").isNull();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void assertOnSubscribeStateMainAndInner3() {
  StepVerifier.create(Flux.just(1)
              .hide()
              .flatMap(f -> Flux.from(s -> {
                s.onSubscribe(Operators.emptySubscription());
                s.onComplete();
                assertAfterOnCompleteInnerState2(((FluxFlatMap.FlatMapInner) s));
              }), 1), 1)
        .verifyComplete();
}

相关文章