io.reactivex.Observable.replay()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(5.7k)|赞(0)|评价(0)|浏览(100)

本文整理了Java中io.reactivex.Observable.replay()方法的一些代码示例,展示了Observable.replay()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.replay()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:replay

Observable.replay介绍

[英]Returns a ConnectableObservable that shares a single subscription to the underlying ObservableSource that will replay all of its items and notifications to any future Observer. A Connectable ObservableSource resembles an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.

Scheduler: This version of replay does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它共享对基础ObserveSource的单个订阅,该订阅将向任何未来的观察者重播其所有项目和通知。可连接的ObserveSource类似于普通ObserveSource,不同之处在于它在订阅时不会开始发送项,而只有在调用其connect方法时才会发送项。
调度程序:默认情况下,此版本的replay不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ConnectableObservable<T> call() {
  3. return parent.replay(bufferSize);
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ConnectableObservable<T> call() {
  3. return parent.replay();
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ConnectableObservable<T> call() {
  3. return parent.replay(bufferSize, time, unit, scheduler);
  4. }
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void replayBoundedSelectorNull() {
  3. just1.replay((Function<Observable<Integer>, Observable<Integer>>)null, 1, 1, TimeUnit.SECONDS);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void replayTimeBoundedSchedulerNull() {
  3. just1.replay(1, TimeUnit.SECONDS, null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void replaySelectorTimeBoundedSchedulerNull() {
  3. just1.replay(new Function<Observable<Integer>, Observable<Integer>>() {
  4. @Override
  5. public Observable<Integer> apply(Observable<Integer> v) {
  6. return v;
  7. }
  8. }, 1, TimeUnit.SECONDS, null);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testUnsubscribeSource() throws Exception {
  3. Action unsubscribe = mock(Action.class);
  4. Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).replay().autoConnect();
  5. o.subscribe();
  6. o.subscribe();
  7. o.subscribe();
  8. verify(unsubscribe, never()).run();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void replaySelectorReturnsNull() {
  3. just1.replay(new Function<Observable<Integer>, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Observable<Integer> o) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void replaySelectorTimeBoundedUnitNull() {
  3. just1.replay(new Function<Observable<Integer>, Observable<Integer>>() {
  4. @Override
  5. public Observable<Integer> apply(Observable<Integer> v) {
  6. return v;
  7. }
  8. }, 1, null, Schedulers.single());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void replayBoundedSelectorReturnsNull() {
  3. just1.replay(new Function<Observable<Integer>, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Observable<Integer> v) {
  6. return null;
  7. }
  8. }, 1, 1, TimeUnit.SECONDS).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replaySelectorReturnsNull() {
  3. Observable.just(1)
  4. .replay(new Function<Observable<Integer>, Observable<Object>>() {
  5. @Override
  6. public Observable<Object> apply(Observable<Integer> v) throws Exception {
  7. return null;
  8. }
  9. })
  10. .test()
  11. .assertFailureAndMessage(NullPointerException.class, "The selector returned a null ObservableSource");
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replaySelectorReturnsNullScheduled() {
  3. Observable.just(1)
  4. .replay(new Function<Observable<Integer>, Observable<Object>>() {
  5. @Override
  6. public Observable<Object> apply(Observable<Integer> v) throws Exception {
  7. return null;
  8. }
  9. }, Schedulers.trampoline())
  10. .test()
  11. .assertFailureAndMessage(NullPointerException.class, "The selector returned a null ObservableSource");
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replayTime() {
  3. Observable.just(1).replay(1, TimeUnit.MINUTES)
  4. .autoConnect()
  5. .test()
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replaySizeAndTime() {
  3. Observable.just(1).replay(1, 1, TimeUnit.MILLISECONDS)
  4. .autoConnect()
  5. .test()
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void source() {
  3. Observable<Integer> source = Observable.range(1, 3);
  4. assertSame(source, (((HasUpstreamObservableSource<?>)source.replay())).source());
  5. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replaySizeScheduler() {
  3. Observable.just(1).replay(1, Schedulers.computation())
  4. .autoConnect()
  5. .test()
  6. .awaitDone(5, TimeUnit.SECONDS)
  7. .assertResult(1);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replayMaxInt() {
  3. Observable.range(1, 2)
  4. .replay(Integer.MAX_VALUE)
  5. .autoConnect()
  6. .test()
  7. .assertResult(1, 2);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void cancelOnArrival() {
  3. Observable.range(1, 2)
  4. .replay(Integer.MAX_VALUE)
  5. .autoConnect()
  6. .test(true)
  7. .assertEmpty();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testTake() {
  3. TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable<Integer> cached = Observable.range(1, 100).replay().autoConnect();
  5. cached.take(10).subscribe(to);
  6. to.assertNoErrors();
  7. to.assertTerminated();
  8. to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  9. // FIXME no longer assertable
  10. // ts.assertUnsubscribed();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void replayIsUnsubscribed() {
  3. ConnectableObservable<Integer> co = Observable.just(1).concatWith(Observable.<Integer>never())
  4. .replay();
  5. if (co instanceof Disposable) {
  6. assertTrue(((Disposable)co).isDisposed());
  7. Disposable connection = co.connect();
  8. assertFalse(((Disposable)co).isDisposed());
  9. connection.dispose();
  10. assertTrue(((Disposable)co).isDisposed());
  11. }
  12. }

相关文章

Observable类方法