io.reactivex.Flowable.bufferSize()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(165)

本文整理了Java中io.reactivex.Flowable.bufferSize()方法的一些代码示例,展示了Flowable.bufferSize()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.bufferSize()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:bufferSize

Flowable.bufferSize介绍

[英]Returns the default internal buffer size used by most async operators.

The value can be overridden via system parameter rx2.buffer-sizebefore the Flowable class is loaded.
[中]返回大多数异步运算符使用的默认内部缓冲区大小。
该值可通过系统参数rx2覆盖。加载可流动类之前的缓冲区大小*。

代码示例

代码示例来源:origin: ReactiveX/RxJava

SpscLinkedArrayQueue<R> getOrCreateQueue() {
  for (;;) {
    SpscLinkedArrayQueue<R> current = queue.get();
    if (current != null) {
      return current;
    }
    current = new SpscLinkedArrayQueue<R>(Flowable.bufferSize());
    if (queue.compareAndSet(null, current)) {
      return current;
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

SimplePlainQueue<T> getOrCreateQueue() {
  SimplePlainQueue<T> q = queue;
  if (q == null) {
    q = new SpscArrayQueue<T>(bufferSize());
    queue = q;
  }
  return q;
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<? extends R> apply(List<Publisher<? extends T>> list) {
    return Flowable.zipIterable(list, zipper, false, Flowable.bufferSize());
  }
}

代码示例来源:origin: ReactiveX/RxJava

SpscLinkedArrayQueue<R> getOrCreateQueue() {
  for (;;) {
    SpscLinkedArrayQueue<R> current = queue.get();
    if (current != null) {
      return current;
    }
    current = new SpscLinkedArrayQueue<R>(Flowable.bufferSize());
    if (queue.compareAndSet(null, current)) {
      return current;
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Integer i) {
    return i % (Flowable.bufferSize() + 2);
  }
})

代码示例来源:origin: ReactiveX/RxJava

MergeWithObserver(Subscriber<? super T> downstream) {
  this.downstream = downstream;
  this.mainSubscription = new AtomicReference<Subscription>();
  this.otherObserver = new OtherObserver<T>(this);
  this.error = new AtomicThrowable();
  this.requested = new AtomicLong();
  this.prefetch = bufferSize();
  this.limit = prefetch - (prefetch >> 2);
}

代码示例来源:origin: ReactiveX/RxJava

MergeWithObserver(Subscriber<? super T> downstream) {
  this.downstream = downstream;
  this.mainSubscription = new AtomicReference<Subscription>();
  this.otherObserver = new OtherObserver<T>(this);
  this.error = new AtomicThrowable();
  this.requested = new AtomicLong();
  this.prefetch = bufferSize();
  this.limit = prefetch - (prefetch >> 2);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs)
 * in a round-robin fashion.
 * @param <T> the value type
 * @param source the source Publisher
 * @return the ParallelFlowable instance
 */
@CheckReturnValue
public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source) {
  return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Take a Publisher and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
 * @param <T> the value type
 * @param source the source Publisher
 * @param parallelism the number of parallel rails
 * @return the new ParallelFlowable instance
 */
@CheckReturnValue
public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism) {
  return from(source, parallelism, Flowable.bufferSize());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNoBackpressure() {
  ArrayList<Long> list = new ArrayList<Long>(Flowable.bufferSize() * 2);
  for (long i = 1; i <= Flowable.bufferSize() * 2 + 1; i++) {
    list.add(i);
  }
  Observable<Long> o = Observable.rangeLong(1, list.size());
  TestObserver<Long> to = new TestObserver<Long>();
  o.subscribe(to);
  to.assertValueSequence(list);
  to.assertTerminated();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public void onNext(Integer t) {
    super.onNext(t);
    if (t == 1) {
      for (int i = 0; i < Flowable.bufferSize() - 1; i++) {
        pp.onNext(i + 2);
      }
    }
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer t) {
    return Flowable.range(1, Flowable.bufferSize() * 2)
        .doOnNext(new Consumer<Integer>() {
          @Override
          public void accept(Integer t) {
            count.getAndIncrement();
          }
        }).hide();
  }
}).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipLastWithBackpressure() {
  Flowable<Integer> f = Flowable.range(0, Flowable.bufferSize() * 2).skipLast(Flowable.bufferSize() + 10);
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  f.observeOn(Schedulers.computation()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals((Flowable.bufferSize()) - 10, ts.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testObserveOn() {
  int num = (int) (Flowable.bufferSize() * 2.1);
  AtomicInteger c = new AtomicInteger();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  incrementingIntegers(c).observeOn(Schedulers.computation()).take(num).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  System.out.println("testObserveOn => Received: " + ts.valueCount() + "  Emitted: " + c.get());
  assertEquals(num, ts.valueCount());
  assertTrue(c.get() < Flowable.bufferSize() * 4);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, Flowable.bufferSize() * 2)
      .ambWith(Flowable.range(0, Flowable.bufferSize() * 2))
      .observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
      .delay(1, TimeUnit.MICROSECONDS) // make it a slightly slow consumer
      .subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 2, ts.values().size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testInnerBackpressureWithAlignedBoundaries() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, Flowable.bufferSize() * 2)
      .concatWith(Flowable.range(0, Flowable.bufferSize() * 2))
      .observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
      .subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testInnerBackpressureWithoutAlignedBoundaries() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, (Flowable.bufferSize() * 2) + 10)
      .concatWith(Flowable.range(0, (Flowable.bufferSize() * 2) + 10))
      .observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
      .subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals((Flowable.bufferSize() * 4) + 20, ts.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 500)
public void testWithObserveOn() throws InterruptedException {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(0, Flowable.bufferSize() * 10).onBackpressureDrop().observeOn(Schedulers.io()).subscribe(ts);
  ts.awaitTerminalEvent();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure2() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void longSequenceEquals() {
  Flowable<Integer> source = Flowable.range(1, Flowable.bufferSize() * 4).subscribeOn(Schedulers.computation());
  Flowable.sequenceEqual(source, source)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(true);
}

相关文章

Flowable类方法