io.reactivex.Flowable类的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(699)

本文整理了Java中io.reactivex.Flowable类的一些代码示例,展示了Flowable类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable类的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable

Flowable介绍

[英]The Flowable class that implements the Reactive-Streams Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.

Reactive-Streams operates with Publishers which Flowable extends. Many operators therefore accept general Publishers directly and allow direct interoperation with other Reactive-Streams implementations.

The Flowable hosts the default buffer size of 128 elements for operators, accessible via #bufferSize(), that can be overridden globally via the system parameter rx2.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.

The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

For more information see the ReactiveX documentation.
[中]实现反应流模式并提供工厂方法、中间运算符和使用反应数据流的能力的可流动类。
反应流与可流动扩展的发布服务器一起运行。因此,许多运营商直接接受通用发布服务器,并允许与其他反应流实现直接互操作。
Flowable为运算符承载128个元素的默认缓冲区大小,可通过#bufferSize()访问,可通过系统参数rx2全局覆盖。缓冲区大小。但是,大多数运算符都有重载,允许显式设置其内部缓冲区大小。
这个类的文档使用大理石图。以下图例说明了这些图表:
有关更多信息,请参阅ReactiveX documentation

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFirstOrElseWithPredicateOfSomeFlowable() {
  Flowable<String> src = Flowable.just("a", "b", "c", "d", "e", "f");
  src.filter(IS_D).first("default").toFlowable().subscribe(w);
  verify(w, times(1)).onNext(anyString());
  verify(w, times(1)).onNext("d");
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure2() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 100000).takeLast(Flowable.bufferSize() * 4)
  .observeOn(Schedulers.newThread()).map(newSlowProcessor()).subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  assertEquals(Flowable.bufferSize() * 4, ts.valueCount());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Flowable<Integer> xs = Flowable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer v) throws Exception {
    return Flowable.range(1, 2).map(new Function<Integer, Integer>() {
      @Override
      public Integer apply(Integer w) throws Exception {
        throw new TestException();
      }
    });
  }
}, true)

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "rawtypes", "unchecked" })
  @Override
  public Publisher<List<Long>> createPublisher(long elements) {
    return
      Flowable.fromIterable(iterate(elements))
      .window(Flowable.just(1).concatWith(Flowable.<Integer>never()))
      .onBackpressureBuffer()
      .flatMap((Function)Functions.identity())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Long> createPublisher(long elements) {
    return
      Flowable.switchOnNext(Flowable.just(
          Flowable.fromIterable(iterate(elements)))
      )
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalError() {
  Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertNoValues();
  ts.assertError(MissingBackpressureException.class);
  ts.assertNotComplete();
  Assert.assertEquals("create: could not emit value due to lack of requests", ts.errors().get(0).getMessage());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nonFusedCancelAfterRequestConditional2() {
  final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(2L);
  Flowable.range(1, 2).hide()
  .observeOn(Schedulers.single())
  .filter(Functions.alwaysTrue())
  .subscribe(ts);
  ts
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() {
    DisposeTrackingScheduler s = new DisposeTrackingScheduler();

    TestSubscriber<Integer> ts = new TestSubscriberFusedCanceling();

    Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()).subscribe(ts);

    assertEquals(1, s.disposedCount.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerEscapeCompleted() {
  Flowable<Integer> source = Flowable.just(0);
  Flowable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestSubscriber<Object> ts = new TestSubscriber<Object>();
  m.subscribe(ts);
  ts.awaitTerminalEvent();
  ts.assertNoErrors();
  System.out.println(ts.values());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelyNormalInNormalOutConditional() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  Flowable.concat(
      Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()),
      Flowable.just(2)
  )
  .test()
  .assertResult(1, 2);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testListFlowable() {
  Flowable<String> w = Flowable.fromIterable(Arrays.asList("one", "two", "three"));
  Flowable<List<String>> flowable = w.toList().toFlowable();
  Subscriber<List<String>> subscriber = TestHelper.mockSubscriber();
  flowable.subscribe(subscriber);
  verify(subscriber, times(1)).onNext(Arrays.asList("one", "two", "three"));
  verify(subscriber, Mockito.never()).onError(any(Throwable.class));
  verify(subscriber, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake2() {
  Flowable<Integer> f = Flowable.just(1, 2, 3, 4, 5);
  Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
  SquareStr squareStr = new SquareStr();
  f.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
  assertEquals(2, squareStr.counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorDelayedAsync() {
  Flowable<Integer> source = Flowable.just(1, 2, 3)
      .concatWith(Flowable.<Integer>error(new TestException()));
  TestSubscriber<Integer> ts = TestSubscriber.create();
  source.observeOn(Schedulers.computation(), true).subscribe(ts);
  ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
  ts.assertValues(1, 2, 3);
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testError2() {
  Flowable<Integer> source = Flowable.concat(Flowable.just(0),
      Flowable.<Integer> error(new TestException("Forced failure")));
  Flowable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestSubscriber<Object> ts = new TestSubscriber<Object>();
  m.subscribe(ts);
  ts.awaitTerminalEvent();
  assertEquals(1, ts.errorCount());
  ts.assertValueCount(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() {
  Flowable<String> src = Flowable.just("a", "b", "c");
  src.filter(IS_D).first("default").subscribe(wo);
  verify(wo, times(1)).onSuccess(anyString());
  verify(wo, times(1)).onSuccess("default");
  verify(wo, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFlatMapTransformsMergeException() {
  Flowable<Integer> onNext = Flowable.error(new TestException());
  Flowable<Integer> onComplete = Flowable.fromIterable(Arrays.asList(4));
  Flowable<Integer> onError = Flowable.fromIterable(Arrays.asList(5));
  Flowable<Integer> source = Flowable.fromIterable(Arrays.asList(10, 20, 30));
  Subscriber<Object> subscriber = TestHelper.mockSubscriber();
  source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(subscriber);
  verify(subscriber).onError(any(TestException.class));
  verify(subscriber, never()).onNext(any());
  verify(subscriber, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void flatMapMaxConcurrentJustJust() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(Flowable.just(1)).flatMap((Function)Functions.identity(), 5).subscribe(ts);
  ts.assertValue(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backFusedErrorConditional() {
  TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ANY);
  Flowable.<Integer>error(new TestException())
  .observeOn(ImmediateThinScheduler.INSTANCE)
  .filter(Functions.alwaysTrue())
  .subscribe(ts);
  SubscriberFusion.assertFusion(ts, QueueFuseable.ASYNC)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void repeatScheduled() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(1).subscribeOn(Schedulers.computation()).repeat(5).subscribe(ts);
  ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  ts.assertValues(1, 1, 1, 1, 1);
  ts.assertNoErrors();
  ts.assertComplete();
}

相关文章

Flowable类方法