reactor.core.publisher.Operators.unboundedOrLimit()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.3k)|赞(0)|评价(0)|浏览(162)

本文整理了Java中reactor.core.publisher.Operators.unboundedOrLimit()方法的一些代码示例,展示了Operators.unboundedOrLimit()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Operators.unboundedOrLimit()方法的具体详情如下:
包路径:reactor.core.publisher.Operators
类名称:Operators
方法名:unboundedOrLimit

Operators.unboundedOrLimit介绍

暂无

代码示例

代码示例来源:origin: reactor/reactor-core

MergeSequentialInner(MergeSequentialMain<?, R> parent, int prefetch) {
  this.parent = parent;
  this.prefetch = prefetch;
  this.limit = Operators.unboundedOrLimit(prefetch);
}

代码示例来源:origin: reactor/reactor-core

SwitchMapInner(SwitchMapMain<?, R> parent, int prefetch, long index) {
  this.parent = parent;
  this.prefetch = prefetch;
  this.limit = Operators.unboundedOrLimit(prefetch);
  this.index = index;
}

代码示例来源:origin: reactor/reactor-core

FlattenIterableSubscriber(CoreSubscriber<? super R> actual,
    Function<? super T, ? extends Iterable<? extends R>> mapper,
    int prefetch,
    Supplier<Queue<T>> queueSupplier) {
  this.actual = actual;
  this.mapper = mapper;
  this.prefetch = prefetch;
  this.queueSupplier = queueSupplier;
  this.limit = Operators.unboundedOrLimit(prefetch);
}

代码示例来源:origin: reactor/reactor-core

static int unboundedOrLimit(int prefetch, int lowTide) {
  if (lowTide <= 0) {
    return prefetch;
  }
  if (lowTide >= prefetch) {
    return unboundedOrLimit(prefetch);
  }
  return prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : lowTide;
}

代码示例来源:origin: reactor/reactor-core

CombineLatestInner(CombineLatestCoordinator<T, ?> parent,
    int index,
    int prefetch) {
  this.parent = parent;
  this.index = index;
  this.prefetch = prefetch;
  this.limit = Operators.unboundedOrLimit(prefetch);
}

代码示例来源:origin: reactor/reactor-core

ZipInner(ZipCoordinator<T, ?> parent,
    int prefetch,
    int index,
    Supplier<? extends Queue<T>> queueSupplier) {
  this.parent = parent;
  this.prefetch = prefetch;
  this.index = index;
  this.queueSupplier = queueSupplier;
  this.limit = Operators.unboundedOrLimit(prefetch);
}

代码示例来源:origin: reactor/reactor-core

FlatMapInner(FlatMapMain<?, R> parent, int prefetch) {
      this.parent = parent;
      this.prefetch = prefetch;
//            this.limit = prefetch >> 2;
      this.limit = Operators.unboundedOrLimit(prefetch);
    }

代码示例来源:origin: reactor/reactor-core

MergeSequentialInner(MergeSequentialMain<T> parent, int prefetch) {
  this.parent = parent;
  this.prefetch = prefetch ;
  this.limit = Operators.unboundedOrLimit(prefetch);
}

代码示例来源:origin: reactor/reactor-core

PublishOnSubscriber(CoreSubscriber<? super T> actual,
    Scheduler scheduler,
    Worker worker,
    boolean delayError,
    int prefetch,
    int lowTide,
    Supplier<? extends Queue<T>> queueSupplier) {
  this.actual = actual;
  this.worker = worker;
  this.scheduler = scheduler;
  this.delayError = delayError;
  this.prefetch = prefetch;
  this.queueSupplier = queueSupplier;
  this.limit = Operators.unboundedOrLimit(prefetch, lowTide);
}

代码示例来源:origin: reactor/reactor-core

PublishOnConditionalSubscriber(ConditionalSubscriber<? super T> actual,
    Scheduler scheduler,
    Worker worker,
    boolean delayError,
    int prefetch,
    int lowTide,
    Supplier<? extends Queue<T>> queueSupplier) {
  this.actual = actual;
  this.worker = worker;
  this.scheduler = scheduler;
  this.delayError = delayError;
  this.prefetch = prefetch;
  this.queueSupplier = queueSupplier;
  this.limit = Operators.unboundedOrLimit(prefetch, lowTide);
}

代码示例来源:origin: reactor/reactor-core

SubscriberIterator(Queue<T> queue, int batchSize) {
  this.queue = queue;
  this.batchSize = batchSize;
  this.limit = Operators.unboundedOrLimit(batchSize);
  this.lock = new ReentrantLock();
  this.condition = lock.newCondition();
}

代码示例来源:origin: reactor/reactor-core

ParallelSourceMain(CoreSubscriber<? super T>[] subscribers, int prefetch,
    Supplier<Queue<T>> queueSupplier) {
  this.subscribers = subscribers;
  this.prefetch = prefetch;
  this.queueSupplier = queueSupplier;
  this.limit = Operators.unboundedOrLimit(prefetch);
  this.requests = new AtomicLongArray(subscribers.length);
  this.emissions = new long[subscribers.length];
}

代码示例来源:origin: reactor/reactor-core

UnicastGroupedFlux(K key,
    Queue<V> queue,
    GroupByMain<?, K, V> parent,
    int prefetch) {
  this.key = key;
  this.queue = queue;
  this.context = parent.currentContext();
  this.parent = parent;
  this.limit = Operators.unboundedOrLimit(prefetch);
}

代码示例来源:origin: reactor/reactor-core

@SuppressWarnings("unchecked")
FluxPublishMulticaster(int prefetch,
    Supplier<? extends Queue<T>> queueSupplier,
    Context ctx) {
  this.prefetch = prefetch;
  this.limit = Operators.unboundedOrLimit(prefetch);
  this.queueSupplier = queueSupplier;
  SUBSCRIBERS.lazySet(this, EMPTY);
  this.context = ctx;
}

代码示例来源:origin: reactor/reactor-core

FlatMapMain(CoreSubscriber<? super R> actual,
    Function<? super T, ? extends Publisher<? extends R>> mapper,
    boolean delayError,
    int maxConcurrency,
    Supplier<? extends Queue<R>> mainQueueSupplier,
    int prefetch,
    Supplier<? extends Queue<R>> innerQueueSupplier) {
  this.actual = actual;
  this.ctx = actual.currentContext();
  this.mapper = mapper;
  this.delayError = delayError;
  this.maxConcurrency = maxConcurrency;
  this.mainQueueSupplier = mainQueueSupplier;
  this.prefetch = prefetch;
  this.innerQueueSupplier = innerQueueSupplier;
  this.limit = Operators.unboundedOrLimit(maxConcurrency);
}

代码示例来源:origin: reactor/reactor-core

ConcatMapImmediate(CoreSubscriber<? super R> actual,
    Function<? super T, ? extends Publisher<? extends R>> mapper,
    Supplier<? extends Queue<T>> queueSupplier, int prefetch) {
  this.actual = actual;
  this.ctx = actual.currentContext();
  this.mapper = mapper;
  this.queueSupplier = queueSupplier;
  this.prefetch = prefetch;
  this.limit = Operators.unboundedOrLimit(prefetch);
  this.inner = new ConcatMapInner<>(this);
}

代码示例来源:origin: reactor/reactor-core

ConcatMapDelayed(CoreSubscriber<? super R> actual,
    Function<? super T, ? extends Publisher<? extends R>> mapper,
    Supplier<? extends Queue<T>> queueSupplier,
    int prefetch, boolean veryEnd) {
  this.actual = actual;
  this.ctx = actual.currentContext();
  this.mapper = mapper;
  this.queueSupplier = queueSupplier;
  this.prefetch = prefetch;
  this.limit = Operators.unboundedOrLimit(prefetch);
  this.veryEnd = veryEnd;
  this.inner = new ConcatMapInner<>(this);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void unboundedOrLimit() {
  assertThat(Operators.unboundedOrLimit(100))
      .as("prefetch - (prefetch >> 2)")
      .isEqualTo(75);
  assertThat(Operators.unboundedOrLimit(Integer.MAX_VALUE))
      .as("unbounded")
      .isEqualTo(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void unboundedOrLimitLowTide() {
  assertThat(Operators.unboundedOrLimit(100, 100))
      .as("same lowTide")
      .isEqualTo(75);
  assertThat(Operators.unboundedOrLimit(100, 110))
      .as("too big lowTide")
      .isEqualTo(75);
  assertThat(Operators.unboundedOrLimit(Integer.MAX_VALUE, Integer.MAX_VALUE))
      .as("MAX_VALUE and same lowTide")
      .isEqualTo(Integer.MAX_VALUE);
  assertThat(Operators.unboundedOrLimit(100, 20))
      .as("smaller lowTide")
      .isEqualTo(20);
  assertThat(Operators.unboundedOrLimit(Integer.MAX_VALUE, 110))
      .as("smaller lowTide and MAX_VALUE")
      .isEqualTo(Integer.MAX_VALUE);
  assertThat(Operators.unboundedOrLimit(100, 0))
      .as("0 lowTide and 100")
      .isEqualTo(100);
  assertThat(Operators.unboundedOrLimit(Integer.MAX_VALUE, 0))
      .as("0 lowTide and MAX_VALUE")
      .isEqualTo(Integer.MAX_VALUE);
  assertThat(Operators.unboundedOrLimit(100, -1))
      .as("-1 lowTide and 100")
      .isEqualTo(100);
  assertThat(Operators.unboundedOrLimit(Integer.MAX_VALUE, -1))
      .as("-1 lowTide and MAX_VALUE")
      .isEqualTo(Integer.MAX_VALUE);
}

代码示例来源:origin: reactor/reactor-core

int limit = Operators.unboundedOrLimit(bufferSize);
long e = emitted;
long ci = consumerIndex;

相关文章