io.reactivex.Observable.defer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.8k)|赞(0)|评价(0)|浏览(147)

本文整理了Java中io.reactivex.Observable.defer()方法的一些代码示例,展示了Observable.defer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.defer()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:defer

Observable.defer介绍

[英]Returns an Observable that calls an ObservableSource factory to create an ObservableSource for each new Observer that subscribes. That is, for each subscriber, the actual ObservableSource that subscriber observes is determined by the factory function.

The defer Observer allows you to defer or delay emitting items from an ObservableSource until such time as an Observer subscribes to the ObservableSource. This allows an Observer to easily obtain updates or a refreshed version of the sequence. Scheduler: defer does not operate by default on a particular Scheduler.
[中]返回一个Observable,调用ObservableSource工厂,为订阅的每个新观察者创建一个ObservableSource。也就是说,对于每个订阅者,订阅者观察到的实际可观察资源由工厂函数确定。
“延迟观察者”允许您延迟或延迟从ObservableSource发出项目,直到观察者订阅ObservableSource为止。这允许观察者轻松获得序列的更新或刷新版本。调度程序:默认情况下,延迟不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxAndroid

static Observable<String> sampleObservable() {
    return Observable.defer(new Callable<ObservableSource<? extends String>>() {
     @Override public ObservableSource<? extends String> call() throws Exception {
        // Do some long running operation
        SystemClock.sleep(5000);
        return Observable.just("one", "two", "three", "four", "five");
      }
    });
  }
}

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

static Observable<String> sampleObservable() {
    return Observable.defer(new Callable<ObservableSource<? extends String>>() {
      @Override
      public ObservableSource<? extends String> call() {
        // Do some long running operation
        SystemClock.sleep(2000);
        return Observable.just("one", "two", "three", "four", "five");
      }
    });
  }
}

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

public Observable<String> brandDeferObservable() {
  return Observable.defer(new Callable<ObservableSource<? extends String>>() {
    @Override
    public ObservableSource<? extends String> call() {
      return Observable.just(brand);
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<String> apply(Long t1) {
      return Observable.defer(new Callable<Observable<String>>() {
        @Override
        public Observable<String> call() {
            return Observable.<String>error(new Exception("Some exception"));
        }
      });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void deferFunctionNull() {
  Observable.defer(null);
}

代码示例来源:origin: commonsguy/cw-omnibus

@Override
public void onActivityResult(int requestCode, int resultCode,
               Intent resultData) {
 if (resultCode==Activity.RESULT_OK) {
  docObservable=Observable
   .defer(() -> (Observable.just(createDurableContent(resultData))))
   .subscribeOn(Schedulers.io())
   .cache()
   .observeOn(AndroidSchedulers.mainThread());
  docSub();
 }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testDeferFunctionThrows() throws Exception {
    Callable<Observable<String>> factory = mock(Callable.class);

    when(factory.call()).thenThrow(new TestException());

    Observable<String> result = Observable.defer(factory);

    Observer<String> o = TestHelper.mockObserver();

    result.subscribe(o);

    verify(o).onError(any(TestException.class));
    verify(o, never()).onNext(any(String.class));
    verify(o, never()).onComplete();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void deferFunctionReturnsNull() {
  Observable.defer(new Callable<Observable<Object>>() {
    @Override
    public Observable<Object> call() {
      return null;
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDefer() throws Throwable {
  Callable<Observable<String>> factory = mock(Callable.class);
  Observable<String> firstObservable = Observable.just("one", "two");
  Observable<String> secondObservable = Observable.just("three", "four");
  when(factory.call()).thenReturn(firstObservable, secondObservable);
  Observable<String> deferred = Observable.defer(factory);
  verifyZeroInteractions(factory);
  Observer<String> firstObserver = TestHelper.mockObserver();
  deferred.subscribe(firstObserver);
  verify(factory, times(1)).call();
  verify(firstObserver, times(1)).onNext("one");
  verify(firstObserver, times(1)).onNext("two");
  verify(firstObserver, times(0)).onNext("three");
  verify(firstObserver, times(0)).onNext("four");
  verify(firstObserver, times(1)).onComplete();
  Observer<String> secondObserver = TestHelper.mockObserver();
  deferred.subscribe(secondObserver);
  verify(factory, times(2)).call();
  verify(secondObserver, times(0)).onNext("one");
  verify(secondObserver, times(0)).onNext("two");
  verify(secondObserver, times(1)).onNext("three");
  verify(secondObserver, times(1)).onNext("four");
  verify(secondObserver, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRetry() {
  final AtomicInteger counter = new AtomicInteger();
  final AtomicInteger times = new AtomicInteger();
  Observable<Integer> source = Observable.defer(new Callable<ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> call() throws Exception {
      if (times.getAndIncrement() < 4) {
        return Observable.error(new TestException());
      }
      return Observable.just(1);
    }
  })
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.retry(5)
  .test()
  .assertResult(1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRetryWhile() {
  final AtomicInteger counter = new AtomicInteger();
  final AtomicInteger times = new AtomicInteger();
  Observable<Integer> source = Observable.defer(new Callable<ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> call() throws Exception {
      if (times.getAndIncrement() < 4) {
        return Observable.error(new TestException());
      }
      return Observable.just(1);
    }
  })
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.retry(5, Functions.alwaysTrue())
  .test()
  .assertResult(1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRetryWhile2() {
  final AtomicInteger counter = new AtomicInteger();
  final AtomicInteger times = new AtomicInteger();
  Observable<Integer> source = Observable.defer(new Callable<ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> call() throws Exception {
      if (times.getAndIncrement() < 4) {
        return Observable.error(new TestException());
      }
      return Observable.just(1);
    }
  })
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.retry(new BiPredicate<Integer, Throwable>() {
    @Override
    public boolean test(Integer a, Throwable b) throws Exception {
      return a < 5;
    }
  })
  .test()
  .assertResult(1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRetryUntil() {
  final AtomicInteger counter = new AtomicInteger();
  final AtomicInteger times = new AtomicInteger();
  Observable<Integer> source = Observable.defer(new Callable<ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> call() throws Exception {
      if (times.getAndIncrement() < 4) {
        return Observable.error(new TestException());
      }
      return Observable.just(1);
    }
  })
  .doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.retryUntil(new BooleanSupplier() {
    @Override
    public boolean getAsBoolean() throws Exception {
      return false;
    }
  })
  .test()
  .assertResult(1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTimeoutSelectorFirstThrows() {
  Observable<Integer> source = Observable.<Integer>never();
  final PublishSubject<Integer> timeout = PublishSubject.create();
  Function<Integer, Observable<Integer>> timeoutFunc = new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t1) {
      return timeout;
    }
  };
  Callable<Observable<Integer>> firstTimeoutFunc = new Callable<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
      throw new TestException();
    }
  };
  Observable<Integer> other = Observable.fromIterable(Arrays.asList(100));
  Observer<Object> o = TestHelper.mockObserver();
  source.timeout(Observable.defer(firstTimeoutFunc), timeoutFunc, other).subscribe(o);
  verify(o).onError(any(TestException.class));
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

Observable<Integer> source = Observable.defer(new Callable<ObservableSource<Integer>>() {
  @Override
  public ObservableSource<Integer> call() throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayWithObservableSubscriptionFunctionThrows() {
  PublishSubject<Integer> source = PublishSubject.create();
  final PublishSubject<Integer> delay = PublishSubject.create();
  Callable<Observable<Integer>> subFunc = new Callable<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
      throw new TestException();
    }
  };
  Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t1) {
      return delay;
    }
  };
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.delay(Observable.defer(subFunc), delayFunc).subscribe(o);
  source.onNext(1);
  delay.onNext(1);
  source.onNext(2);
  inOrder.verify(o).onError(any(TestException.class));
  inOrder.verifyNoMoreInteractions();
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayWithObservableSubscriptionThrows() {
  PublishSubject<Integer> source = PublishSubject.create();
  final PublishSubject<Integer> delay = PublishSubject.create();
  Callable<Observable<Integer>> subFunc = new Callable<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
      return delay;
    }
  };
  Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t1) {
      return delay;
    }
  };
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.delay(Observable.defer(subFunc), delayFunc).subscribe(o);
  source.onNext(1);
  delay.onError(new TestException());
  source.onNext(2);
  inOrder.verify(o).onError(any(TestException.class));
  inOrder.verifyNoMoreInteractions();
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testMergeCovariance4() {
  Observable<Movie> o1 = Observable.defer(new Callable<Observable<Movie>>() {
    @Override
    public Observable<Movie> call() {
      return Observable.just(
          new HorrorMovie(),
          new Movie()
      );
    }
  });
  Observable<Media> o2 = Observable.just(new Media(), new HorrorMovie());
  List<Media> values = Observable.merge(o1, o2).toList().blockingGet();
  assertTrue(values.get(0) instanceof HorrorMovie);
  assertTrue(values.get(1) instanceof Movie);
  assertTrue(values.get(2) != null);
  assertTrue(values.get(3) instanceof HorrorMovie);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayWithObservableSubscriptionNormal() {
  PublishSubject<Integer> source = PublishSubject.create();
  final PublishSubject<Integer> delay = PublishSubject.create();
  Callable<Observable<Integer>> subFunc = new Callable<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
      return delay;
    }
  };
  Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t1) {
      return delay;
    }
  };
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.delay(Observable.defer(subFunc), delayFunc).subscribe(o);
  source.onNext(1);
  delay.onNext(1);
  source.onNext(2);
  delay.onNext(2);
  inOrder.verify(o).onNext(2);
  inOrder.verifyNoMoreInteractions();
  verify(o, never()).onError(any(Throwable.class));
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDelayWithObservableSubscriptionRunCompletion() {
  PublishSubject<Integer> source = PublishSubject.create();
  final PublishSubject<Integer> sdelay = PublishSubject.create();
  final PublishSubject<Integer> delay = PublishSubject.create();
  Callable<Observable<Integer>> subFunc = new Callable<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
      return sdelay;
    }
  };
  Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer t1) {
      return delay;
    }
  };
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  source.delay(Observable.defer(subFunc), delayFunc).subscribe(o);
  source.onNext(1);
  sdelay.onComplete();
  source.onNext(2);
  delay.onNext(2);
  inOrder.verify(o).onNext(2);
  inOrder.verifyNoMoreInteractions();
  verify(o, never()).onError(any(Throwable.class));
  verify(o, never()).onComplete();
}

相关文章

Observable类方法