io.reactivex.Observable.doOnTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(113)

本文整理了Java中io.reactivex.Observable.doOnTerminate()方法的一些代码示例,展示了Observable.doOnTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnTerminate()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnTerminate

Observable.doOnTerminate介绍

[英]Modifies the source ObservableSource so that it invokes an action when it calls onComplete or onError.

This differs from doAfterTerminate in that this happens before the onComplete or onError notification. Scheduler: doOnTerminate does not operate by default on a particular Scheduler.
[中]修改源ObservableSource,以便在调用onComplete或onError时调用操作。
这与doAfterTerminate的不同之处在于,这发生在onComplete或onError通知之前。调度器:默认情况下,DooInterminate不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnTerminatedNull() {
  just1.doOnTerminate(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnTerminateComplete() {
  final AtomicBoolean r = new AtomicBoolean();
  String output = Observable.just("one").doOnTerminate(new Action() {
    @Override
    public void run() {
      r.set(true);
    }
  }).blockingSingle();
  assertEquals("one", output);
  assertTrue(r.get());
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
@SuppressWarnings("ConstantConditions")
protected void protectedRun(final ObservableEmitter<T> emitter, final QueueReleaseInterface queueReleaseInterface)
    throws Throwable {
  final Observable<T> operationObservable;
  try {
    operationObservable = operation.asObservable(bluetoothGatt, gattCallback, callbackScheduler);
  } catch (Throwable throwable) {
    queueReleaseInterface.release();
    throw throwable;
  }
  if (operationObservable == null) {
    queueReleaseInterface.release();
    throw new IllegalArgumentException("The custom operation asObservable method must return a non-null observable");
  }
  final QueueReleasingEmitterWrapper<T> emitterWrapper = new QueueReleasingEmitterWrapper<>(emitter, queueReleaseInterface);
  operationObservable
      .doOnTerminate(clearNativeCallbackReferenceAction())
      .subscribe(emitterWrapper);
}

代码示例来源:origin: alibaba/Tangram-Android

public RxTimer(long interval) {
  this.mInterval = interval;
  this.mStatus = TimerStatus.Waiting;
  this.mIntervalObservable = Observable
      .interval(0, this.mInterval, TimeUnit.MILLISECONDS)
      .doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
          mStatus = TimerStatus.Running;
          Log.d("RxTimerSupportTest", "accept " + disposable + " status " + mStatus);
        }
      })
      .doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
          mStatus = TimerStatus.Paused;
          Log.d("RxTimerSupportTest", "on dispose " + " status " + mStatus);
        }
      })
      .doOnTerminate(new Action() {
        @Override
        public void run() throws Exception {
          mStatus = TimerStatus.Stopped;
          Log.d("RxTimerSupportTest", "on terminate " + " status " + mStatus);
        }
      })
      .share();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void doOnTerminateError() {
    final AtomicBoolean r = new AtomicBoolean();
    Observable.<String>error(new TestException()).doOnTerminate(new Action() {
      @Override
      public void run() {
        r.set(true);
      }
    })
    .test()
    .assertFailure(TestException.class);
    assertTrue(r.get());
  }
}

代码示例来源:origin: AzimoLabs/Language-Switcher-Tile

public void selectLanguage(String language) {
    languageSelector
      .select(language)
      .doOnTerminate(view::showWarning)
      .doOnTerminate(view::finish)
      .subscribe();
  }
}

代码示例来源:origin: io.reactivex/rxjavafx

/**
 * Performs the provided onTerminate action on the FX thread
 * @param onTerminate
 * @param <T>
 */
public static <T> ObservableTransformer<T,T> doOnTerminateFx(Action onTerminate) {
  return obs -> obs.doOnTerminate(() -> runOnFx(onTerminate));
}

代码示例来源:origin: io.reactivex.rxjava2/rxjavafx

/**
 * Performs the provided onTerminate action on the FX thread
 * @param onTerminate
 * @param <T>
 */
public static <T> ObservableTransformer<T,T> doOnTerminateFx(Action onTerminate) {
  return obs -> obs.doOnTerminate(() -> runOnFx(onTerminate));
}

代码示例来源:origin: AzimoLabs/Language-Switcher-Tile

private void selectNextLanguage() {
  selector
    .next()
    .doOnTerminate(this::showWarning)
    .subscribe();
}

代码示例来源:origin: jruesga/rview

private Observable<Boolean> actionObserver(Callable<Boolean> call) {
  return SafeObservable.fromNullCallable(call)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .doOnSubscribe(disposable -> changeInProgressStatus(true))
      .doOnTerminate(() -> changeInProgressStatus(false));
}

代码示例来源:origin: imuhao/RxPicker

@Override public void loadAllImage(final Context context) {
  loadAllFolder(context).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe(new Consumer<Disposable>() {
     @Override public void accept(@NonNull Disposable disposable) throws Exception {
      view.showWaitDialog();
     }
    })
    .doOnTerminate(new Action() {
     @Override public void run() throws Exception {
      view.hideWaitDialog();
     }
    })
    .subscribe(new Consumer<List<ImageFolder>>() {
     @Override public void accept(@NonNull List<ImageFolder> imageFolders) throws Exception {
      view.showAllImage(imageFolders);
     }
    }, new Consumer<Throwable>() {
     @Override public void accept(@NonNull Throwable throwable) throws Exception {
      T.show(context, context.getString(R.string.load_image_error));
     }
    });
 }
}

代码示例来源:origin: jruesga/rview

@SuppressWarnings("ConstantConditions")
private Observable<List<ItemModel>> refreshItems() {
  return getDataProducer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe(disposable ->
        ((WizardActivity)getActivity()).changeInProgressStatus(true))
    .doOnTerminate(() ->
        ((WizardActivity)getActivity()).changeInProgressStatus(false));
}

代码示例来源:origin: Leeii/LeeFream

@Override
  public ObservableSource<T> apply(Observable<T> upstream) {
    return upstream.subscribeOn(Schedulers.io())
        .compose(RxLifecycle.bind((Activity) mView.context()).<T>withObservable())
        .doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
            mView.showLoading();
          }
        })
        .doOnTerminate(new Action() {
          @Override
          public void run() throws Exception {
            mView.dismissLoading();
          }
        })
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread());
  }
};

代码示例来源:origin: zhpanvip/Retrofit2

@Override
  public ObservableSource<T> apply(Observable<T> upstream) {
    return upstream.doOnSubscribe(new Consumer<Disposable>() {
      @Override
      public void accept(Disposable disposable) throws Exception {
      }
    }).doOnTerminate(new Action() {
      @Override
      public void run() throws Exception {
        Activity context;
        if ((context = activityWeakReference.get()) != null
            && !context.isFinishing()) {
          dialogUtils.dismissProgress();
        }
      }
    }).doOnSubscribe(new Consumer<Disposable>() {
      @Override
      public void accept(Disposable disposable) throws Exception {
        /*Activity context;
        if ((context = activityWeakReference.get()) != null
            && !context.isFinishing()) {
          dialogUtils.dismissProgress();
        }*/
      }
    });
  }
};

代码示例来源:origin: skydoves/SyncMarket

@Override
protected void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  setContentView(R.layout.activity_main);
  ButterKnife.bind(this);
  SyncMarket.init(this);
  SyncMarket.setPackageName("com.skydoves.waterdays");
  ProgressBar progressBar = (ProgressBar)findViewById(R.id.progressbar);
  TextView tv_ver = (TextView)findViewById(R.id.tv_ver);
  SyncMarket.getVersionObservable()
      .subscribe(ver -> tv_ver.setText(ver));
  TextView tv_pub = (TextView)findViewById(R.id.tv_pub);
  SyncMarket.getPublishedDateObservable()
      .subscribe(pub -> tv_pub.setText(pub));
  TextView tv_down = (TextView)findViewById(R.id.tv_down);
  SyncMarket.getDownloadsObservable()
      .subscribe(down -> tv_down.setText(down));
  TextView tv_oper = (TextView)findViewById(R.id.tv_oper);
  SyncMarket.getDownloadsObservable()
      .subscribe(oper -> tv_oper.setText(oper));
  TextView tv_rec = (TextView)findViewById(R.id.tv_rec);
  SyncMarket.getRecentChangesObservable()
      .doOnTerminate(() -> progressBar.setVisibility(View.GONE))
      .subscribe(recs -> {
        for(int i =0; i<recs.length; i++)
          tv_rec.append(recs[i] + "\n");
      });
}

代码示例来源:origin: laizimo/richeditor

.doOnTerminate(new Action() {
  @Override
  public void run() throws Exception {

代码示例来源:origin: laizimo/richeditor

.doOnTerminate(new Action() {
  @Override
  public void run() throws Exception {

代码示例来源:origin: quanturium/bouquet

getMessageManager().printEvent(getComponentInfo(), RxEvent.COMPLETE);
})
.doOnTerminate(() -> {
  if (getScope() == RxLogger.Scope.ALL || getScope() == RxLogger.Scope.LIFECYCLE)
    getMessageManager().printEvent(getComponentInfo(), RxEvent.TERMINATE);

相关文章

Observable类方法