reactor.core.publisher.Operators.serialize()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(142)

本文整理了Java中reactor.core.publisher.Operators.serialize()方法的一些代码示例,展示了Operators.serialize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.serialize()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:serialize

Operators.serialize介绍

[英]Safely gate a Subscriber by making sure onNext signals are delivered sequentially (serialized). Serialization uses thread-stealing and a potentially unbounded queue that might starve a calling thread if races are too important and Subscriber is slower.
[中]通过确保按顺序(序列化)发送onNext信号,安全地关闭订户。序列化使用线程窃取和一个潜在的无限队列,如果竞争太重要且订户速度较慢,则可能会导致调用线程饥饿。

代码示例

代码示例来源:origin: reactor/reactor-core

TakeUntilMainSubscriber(CoreSubscriber<? super T> actual) {
  this.actual = Operators.serialize(actual);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Create a {@link FluxProcessor} that safely gates multi-threaded producer
 * {@link Subscriber#onNext(Object)}.
 *
 * @return a serializing {@link FluxProcessor}
 */
public final FluxProcessor<IN, OUT> serialize() {
  return new DelegateProcessor<>(this, Operators.serialize(this));
}

代码示例来源:origin: reactor/reactor-core

SkipUntilMainSubscriber(CoreSubscriber<? super T> actual) {
  this.actual = Operators.serialize(actual);
  this.ctx = actual.currentContext();
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super C> actual) {
  source.subscribe(new BufferTimeoutSubscriber<>(Operators.serialize(actual),
      batchSize,
      timespan,
      timer.createWorker(),
      bufferSupplier));
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super R> actual) {
  CoreSubscriber<R> serial = Operators.serialize(actual);
  WithLatestFromSubscriber<T, U, R> main =
      new WithLatestFromSubscriber<>(serial, combiner);
  WithLatestFromOtherSubscriber<U> secondary =
      new WithLatestFromOtherSubscriber<>(main);
  other.subscribe(secondary);
  source.subscribe(main);
}

代码示例来源:origin: reactor/reactor-core

static <T> void subscribe(CoreSubscriber<? super T> s, Function<? super
    Flux<Throwable>, ?
    extends Publisher<?>> whenSourceFactory, Publisher<? extends T> source) {
  RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber();
  Subscriber<Throwable> signaller = Operators.serialize(other.completionSignal);
  signaller.onSubscribe(Operators.emptySubscription());
  CoreSubscriber<T> serial = Operators.serialize(s);
  RetryWhenMainSubscriber<T> main =
      new RetryWhenMainSubscriber<>(serial, signaller, source);
  other.main = main;
  serial.onSubscribe(main);
  Publisher<?> p;
  try {
    p = Objects.requireNonNull(whenSourceFactory.apply(other),
    "The whenSourceFactory returned a null Publisher");
  }
  catch (Throwable e) {
    s.onError(Operators.onOperatorError(e, s.currentContext()));
    return;
  }
  p.subscribe(other);
  if (!main.cancelled) {
    source.subscribe(main);
  }
}

代码示例来源:origin: reactor/reactor-core

FluxRepeatWhen.RepeatWhenOtherSubscriber other =
    new FluxRepeatWhen.RepeatWhenOtherSubscriber();
Subscriber<Long> signaller = Operators.serialize(other.completionSignal);
CoreSubscriber<T> serial = Operators.serialize(actual);

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  RepeatWhenOtherSubscriber other = new RepeatWhenOtherSubscriber();
  Subscriber<Long> signaller = Operators.serialize(other.completionSignal);
  signaller.onSubscribe(Operators.emptySubscription());
  CoreSubscriber<T> serial = Operators.serialize(actual);
  RepeatWhenMainSubscriber<T> main =
      new RepeatWhenMainSubscriber<>(serial, signaller, source);
  other.main = main;
  serial.onSubscribe(main);
  Publisher<?> p;
  try {
    p = Objects.requireNonNull(whenSourceFactory.apply(other),
        "The whenSourceFactory returned a null Publisher");
  }
  catch (Throwable e) {
    actual.onError(Operators.onOperatorError(e, actual.currentContext()));
    return;
  }
  p.subscribe(other);
  if (!main.cancelled) {
    source.subscribe(main);
  }
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super C> actual) {
  C buffer;
  try {
    buffer = Objects.requireNonNull(bufferSupplier.get(),
        "The bufferSupplier returned a null buffer");
  }
  catch (Throwable e) {
    Operators.error(actual, Operators.onOperatorError(e,  actual.currentContext()));
    return;
  }
  BufferBoundaryMain<T, U, C> parent =
      new BufferBoundaryMain<>(
          source instanceof FluxInterval ?
              actual : Operators.serialize(actual),
          buffer, bufferSupplier);
  actual.onSubscribe(parent);
  other.subscribe(parent.other);
  source.subscribe(parent);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  CoreSubscriber<T> serial = Operators.serialize(actual);
  SampleMainSubscriber<T> main = new SampleMainSubscriber<>(serial);
  actual.onSubscribe(main);
  other.subscribe(new SampleOther<>(main));
  source.subscribe(main);
}

代码示例来源:origin: reactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  CoreSubscriber<T> serial = Operators.serialize(actual);
  TimeoutMainSubscriber<T, V> main =
      new TimeoutMainSubscriber<>(serial, itemTimeout, other, timeoutDescription);
  serial.onSubscribe(main);
  TimeoutTimeoutSubscriber ts = new TimeoutTimeoutSubscriber(main, 0L);
  main.setTimeout(ts);
  firstTimeout.subscribe(ts);
  source.subscribe(main);
}

代码示例来源:origin: reactor/reactor-core

@Override
  @SuppressWarnings("unchecked")
  public void subscribe(CoreSubscriber<? super T> actual) {

    CoreSubscriber<T> serial = Operators.serialize(actual);

    FluxTimeout.TimeoutMainSubscriber<T, V> main =
        new FluxTimeout.TimeoutMainSubscriber<>(serial, NEVER, other,
            addNameToTimeoutDescription(source, timeoutDescription));

    serial.onSubscribe(main);

    FluxTimeout.TimeoutTimeoutSubscriber ts =
        new FluxTimeout.TimeoutTimeoutSubscriber(main, 0L);

    main.setTimeout(ts);

    firstTimeout.subscribe(ts);

    source.subscribe(main);
  }
}

代码示例来源:origin: io.projectreactor/reactor-core

TakeUntilMainSubscriber(CoreSubscriber<? super T> actual) {
  this.actual = Operators.serialize(actual);
}

代码示例来源:origin: io.projectreactor/reactor-core

/**
 * Create a {@link FluxProcessor} that safely gates multi-threaded producer
 * {@link Subscriber#onNext(Object)}.
 *
 * @return a serializing {@link FluxProcessor}
 */
public final FluxProcessor<IN, OUT> serialize() {
  return new DelegateProcessor<>(this, Operators.serialize(this));
}

代码示例来源:origin: io.projectreactor/reactor-core

SkipUntilMainSubscriber(CoreSubscriber<? super T> actual) {
  this.actual = Operators.serialize(actual);
  this.ctx = actual.currentContext();
}

代码示例来源:origin: io.projectreactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super C> actual) {
  source.subscribe(new BufferTimeoutSubscriber<>(Operators.serialize(actual),
      batchSize,
      timespan,
      timer.createWorker(),
      bufferSupplier));
}

代码示例来源:origin: io.projectreactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super R> actual) {
  CoreSubscriber<R> serial = Operators.serialize(actual);
  WithLatestFromSubscriber<T, U, R> main =
      new WithLatestFromSubscriber<>(serial, combiner);
  WithLatestFromOtherSubscriber<U> secondary =
      new WithLatestFromOtherSubscriber<>(main);
  other.subscribe(secondary);
  source.subscribe(main);
}

代码示例来源:origin: io.projectreactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  CoreSubscriber<T> serial = Operators.serialize(actual);
  SampleMainSubscriber<T> main = new SampleMainSubscriber<>(serial);
  actual.onSubscribe(main);
  other.subscribe(new SampleOther<>(main));
  source.subscribe(main);
}

代码示例来源:origin: io.projectreactor/reactor-core

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
  CoreSubscriber<T> serial = Operators.serialize(actual);
  TimeoutMainSubscriber<T, V> main =
      new TimeoutMainSubscriber<>(serial, itemTimeout, other, timeoutDescription);
  serial.onSubscribe(main);
  TimeoutTimeoutSubscriber ts = new TimeoutTimeoutSubscriber(main, 0L);
  main.setTimeout(ts);
  firstTimeout.subscribe(ts);
  source.subscribe(main);
}

代码示例来源:origin: io.projectreactor/reactor-core

@Override
  @SuppressWarnings("unchecked")
  public void subscribe(CoreSubscriber<? super T> actual) {

    CoreSubscriber<T> serial = Operators.serialize(actual);

    FluxTimeout.TimeoutMainSubscriber<T, V> main =
        new FluxTimeout.TimeoutMainSubscriber<>(serial, NEVER, other,
            addNameToTimeoutDescription(source, timeoutDescription));

    serial.onSubscribe(main);

    FluxTimeout.TimeoutTimeoutSubscriber ts =
        new FluxTimeout.TimeoutTimeoutSubscriber(main, 0L);

    main.setTimeout(ts);

    firstTimeout.subscribe(ts);

    source.subscribe(main);
  }
}

相关文章