io.reactivex.Flowable.replay()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中io.reactivex.Flowable.replay()方法的一些代码示例,展示了Flowable.replay()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.replay()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:replay

Flowable.replay介绍

[英]Returns a ConnectableFlowable that shares a single subscription to the underlying Publisher that will replay all of its items and notifications to any future Subscriber. A Connectable Publisher resembles an ordinary Publisher, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.

Backpressure: This operator supports backpressure. Note that the upstream requests are determined by the child Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will request 100 elements from the underlying Publisher sequence. Scheduler: This version of replay does not operate by default on a particular Scheduler.
[中]返回一个ConnectableFlowTable,它与基础发布服务器共享一个订阅,该发布服务器将向任何未来订阅服务器重播其所有项目和通知。可连接发布服务器类似于普通发布服务器,只是它在订阅时不开始发送项目,而仅在调用其connect方法时才开始发送项目。
背压:此操作员支持背压。请注意,上游请求由请求量最大的子订阅者确定:即,请求量为10和100的两个子订阅者将从底层发布者序列请求100个元素。调度程序:默认情况下,此版本的replay不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ConnectableFlowable<T> call() {
    return parent.replay();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ConnectableFlowable<T> call() {
    return parent.replay(bufferSize);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void replaySelectorTimeBoundedSchedulerNull() {
  just1.replay(new Function<Flowable<Integer>, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Flowable<Integer> v) {
      return v;
    }
  }, 1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings({ "unchecked", "rawtypes" })
  @Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).replay((Function)Functions.identity())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).replay().autoConnect()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void replayBoundedUnitNull() {
  just1.replay(new Function<Flowable<Integer>, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Flowable<Integer> v) {
      return v;
    }
  }, 1, 1, null).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void replaySelectorTimeBoundedUnitNull() {
  just1.replay(new Function<Flowable<Integer>, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Flowable<Integer> v) {
      return v;
    }
  }, 1, null, Schedulers.single());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(
    Flowable.never()
    .replay()
  );
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void replayTimeBoundedSelectorReturnsNull() {
  just1.replay(new Function<Flowable<Integer>, Publisher<Object>>() {
    @Override
    public Publisher<Object> apply(Flowable<Integer> v) {
      return null;
    }
  }, 1, TimeUnit.SECONDS, Schedulers.single()).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testUnsubscribeSource() throws Exception {
  Action unsubscribe = mock(Action.class);
  Flowable<Integer> f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect();
  f.subscribe();
  f.subscribe();
  f.subscribe();
  verify(unsubscribe, never()).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void source() {
  Flowable<Integer> source = Flowable.range(1, 3);
  assertSame(source, (((HasUpstreamPublisher<?>)source.replay())).source());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void replaySelectorSizeScheduler() {
  Flowable.just(1).replay(Functions.<Flowable<Integer>>identity(), 1, Schedulers.io())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void replaySelectorReturnsNull() {
  Flowable.just(1)
  .replay(new Function<Flowable<Integer>, Publisher<Object>>() {
    @Override
    public Publisher<Object> apply(Flowable<Integer> v) throws Exception {
      return null;
    }
  }, Schedulers.trampoline())
  .test()
  .assertFailureAndMessage(NullPointerException.class, "The selector returned a null Publisher");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void cancelOnArrival() {
  Flowable.range(1, 2)
  .replay(Integer.MAX_VALUE)
  .autoConnect()
  .test(Long.MAX_VALUE, true)
  .assertEmpty();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testColdReplayNoBackpressure() {
  Flowable<Integer> source = Flowable.range(0, 1000).replay().autoConnect();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.subscribe(ts);
  ts.assertNoErrors();
  ts.assertTerminated();
  List<Integer> onNextEvents = ts.values();
  assertEquals(1000, onNextEvents.size());
  for (int i = 0; i < 1000; i++) {
    assertEquals((Integer)i, onNextEvents.get(i));
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void replaySizeAndTime() {
  Flowable.just(1).replay(1, 1, TimeUnit.MILLISECONDS)
  .autoConnect()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void replaySelectorTime() {
  Flowable.just(1).replay(Functions.<Flowable<Integer>>identity(), 1, TimeUnit.MINUTES)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void replayIsUnsubscribed() {
  ConnectableFlowable<Integer> cf = Flowable.just(1)
  .replay();
  if (cf instanceof Disposable) {
    assertTrue(((Disposable)cf).isDisposed());
    Disposable connection = cf.connect();
    assertFalse(((Disposable)cf).isDisposed());
    connection.dispose();
    assertTrue(((Disposable)cf).isDisposed());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void replayScheduler() {
  Flowable.just(1).replay(Schedulers.computation())
  .autoConnect()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timedNoOutdatedData() {
  TestScheduler scheduler = new TestScheduler();
  Flowable<Integer> source = Flowable.just(1)
      .replay(2, TimeUnit.SECONDS, scheduler)
      .autoConnect();
  source.test().assertResult(1);
  source.test().assertResult(1);
  scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
  source.test().assertResult();
}

相关文章

Flowable类方法