io.reactivex.Observable.repeatWhen()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(167)

本文整理了Java中io.reactivex.Observable.repeatWhen()方法的一些代码示例,展示了Observable.repeatWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.repeatWhen()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:repeatWhen

Observable.repeatWhen介绍

[英]Returns an Observable that emits the same values as the source ObservableSource with the exception of an onComplete. An onComplete notification from the source will result in the emission of a void item to the ObservableSource provided as an argument to the notificationHandlerfunction. If that ObservableSource calls onComplete or onError then repeatWhen will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

Scheduler: repeatWhen does not operate by default on a particular Scheduler.
[中]返回与源ObservableSource发出相同值的Observable,但onComplete除外。来自源的未完成通知将导致向作为notificationHandlerfunction参数提供的ObservableSource发出无效项。如果该ObservableSource调用onComplete或onError,那么repeatWhen将在子订阅上调用onComplete或onError。否则,此ObservableSource将重新订阅源ObservableSource。
调度程序:repeatWhen默认情况下不在特定调度程序上运行。

代码示例

代码示例来源:origin: Polidea/RxAndroidBle

  1. /**
  2. * A convenience function creating a transformer that will repeat the source observable whenever it will complete
  3. *
  4. * @param <T> the type of the transformed observable
  5. * @return transformer that will emit observable that will never complete (source will be subscribed again)
  6. */
  7. @NonNull
  8. private static <T> ObservableTransformer<T, T> repeatAfterCompleted() {
  9. return observable -> observable.repeatWhen(completedNotification -> completedNotification);
  10. }
  11. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. @Override
  2. public Observable<RxBleInternalScanResult> apply(final Observable<RxBleInternalScanResult> rxBleInternalScanResultObservable) {
  3. return rxBleInternalScanResultObservable.take(windowInMillis, TimeUnit.MILLISECONDS, scheduler)
  4. .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  5. @Override
  6. public ObservableSource<?> apply(Observable<Object> observable) throws Exception {
  7. return observable.delay(delayToNextWindow, TimeUnit.MILLISECONDS, scheduler
  8. );
  9. }
  10. });
  11. }
  12. };

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void repeatWhenNull() {
  3. just1.repeatWhen(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void repeatWhenFunctionReturnsNull() {
  3. just1.repeatWhen(new Function<Observable<Object>, Observable<Object>>() {
  4. @Override
  5. public Observable<Object> apply(Observable<Object> v) {
  6. return null;
  7. }
  8. }).blockingSubscribe();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void redoCancel() {
  3. final TestObserver<Integer> to = new TestObserver<Integer>();
  4. Observable.just(1)
  5. .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
  6. @Override
  7. public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
  8. return o.map(new Function<Object, Object>() {
  9. int count;
  10. @Override
  11. public Object apply(Object v) throws Exception {
  12. if (++count == 1) {
  13. to.cancel();
  14. }
  15. return v;
  16. }
  17. });
  18. }
  19. })
  20. .subscribe(to);
  21. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  2. @Override
  3. public ObservableSource<?> apply(Observable<Object> onWriteFinished) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRepeatWhen() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
  5. @Override
  6. public void run() throws Exception {
  7. counter.getAndIncrement();
  8. }
  9. });
  10. final AtomicInteger times = new AtomicInteger();
  11. source.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  12. @Override
  13. public ObservableSource<?> apply(Observable<Object> e) throws Exception {
  14. return e.takeWhile(new Predicate<Object>() {
  15. @Override
  16. public boolean test(Object v) throws Exception {
  17. return times.getAndIncrement() < 4;
  18. }
  19. });
  20. }
  21. })
  22. .test()
  23. .assertResult(1, 1, 1, 1, 1);
  24. assertEquals(0, counter.get());
  25. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void shouldDisposeInnerObservable() {
  3. final PublishSubject<Object> subject = PublishSubject.create();
  4. final Disposable disposable = Observable.just("Leak")
  5. .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
  6. @Override
  7. public ObservableSource<Object> apply(Observable<Object> completions) throws Exception {
  8. return completions.switchMap(new Function<Object, ObservableSource<Object>>() {
  9. @Override
  10. public ObservableSource<Object> apply(Object ignore) throws Exception {
  11. return subject;
  12. }
  13. });
  14. }
  15. })
  16. .subscribe();
  17. assertTrue(subject.hasObservers());
  18. disposable.dispose();
  19. assertFalse(subject.hasObservers());
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void whenTake() {
  3. Observable.range(1, 3).repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
  4. @Override
  5. public ObservableSource<Object> apply(Observable<Object> handler) throws Exception {
  6. return handler.take(2);
  7. }
  8. })
  9. .test()
  10. .assertResult(1, 2, 3, 1, 2, 3);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void handlerError() {
  3. Observable.range(1, 3)
  4. .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
  5. @Override
  6. public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
  7. return v.map(new Function<Object, Object>() {
  8. @Override
  9. public Object apply(Object w) throws Exception {
  10. throw new TestException();
  11. }
  12. });
  13. }
  14. })
  15. .test()
  16. .assertFailure(TestException.class, 1, 2, 3);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testRepeatWhen() {
  3. Observable.error(new TestException())
  4. .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
  5. @Override
  6. public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
  7. return v.delay(10, TimeUnit.SECONDS);
  8. }
  9. })
  10. .test()
  11. .awaitDone(5, TimeUnit.SECONDS)
  12. .assertFailure(TestException.class);
  13. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. timeoutObservable
  2. .repeatWhen(bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed(
  3. writeOperationAckStrategy, byteBuffer, emitterWrapper
  4. ))

代码示例来源:origin: ReactiveX/RxJava

  1. .repeatWhen(new Function<Observable<Object>, ObservableSource<Integer>>() {
  2. @Override
  3. public ObservableSource<Integer> apply(Observable<Object> v)

代码示例来源:origin: imZeJun/RxSample

  1. private void startAdvancePolling() {
  2. Log.d(TAG, "startAdvancePolling click");
  3. Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {
  4. @Override
  5. public void run() throws Exception {
  6. doWork();
  7. }
  8. }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {
  9. private long mRepeatCount;
  10. @Override
  11. public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
  12. //必须作出反应,这里是通过flatMap操作符。
  13. return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {
  14. @Override
  15. public ObservableSource<Long> apply(Object o) throws Exception {
  16. if (++mRepeatCount > 4) {
  17. //return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
  18. return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
  19. }
  20. Log.d(TAG, "startAdvancePolling apply");
  21. return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
  22. }
  23. });
  24. }
  25. });
  26. DisposableObserver<Long> disposableObserver = getDisposableObserver();
  27. observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
  28. mCompositeDisposable.add(disposableObserver);
  29. }

代码示例来源:origin: Carson-Ho/RxJavaLearningMaterial

  1. observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  2. @Override

代码示例来源:origin: Carson-Ho/RxJavaLearningMaterial

  1. Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  2. @Override
  3. public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
  4. Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  5. @Override
  6. public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {

代码示例来源:origin: akarnokd/akarnokd-misc

  1. .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b))

相关文章

Observable类方法