io.reactivex.Observable.unsubscribeOn()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(239)

本文整理了Java中io.reactivex.Observable.unsubscribeOn()方法的一些代码示例,展示了Observable.unsubscribeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.unsubscribeOn()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:unsubscribeOn

Observable.unsubscribeOn介绍

[英]Modifies the source ObservableSource so that subscribers will dispose it on a specified Scheduler.

Scheduler: You specify which Scheduler this operator will use.
[中]修改源ObservableSource,以便订阅者在指定的计划程序上处理它。
调度器:指定该操作员将使用的调度器。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void unsubscribeOnNull() {
  3. just1.unsubscribeOn(null);
  4. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. /**
  2. * Function that returns an observable that emits {@link Boolean#TRUE} every time the button is being clicked. It enables the button
  3. * whenever the returned Observable is being subscribed and disables it when un-subscribed. Takes care of making interactions with
  4. * the button on the proper thread.
  5. *
  6. * @param button the button to wrap into an Observable
  7. * @return the observable
  8. */
  9. @NonNull
  10. private static Observable<Boolean> activatedClicksObservable(Button button) {
  11. return Observable.using(
  12. () -> {
  13. button.setEnabled(true);
  14. return button;
  15. },
  16. aView -> RxView.clicks(aView).map(aVoid -> Boolean.TRUE),
  17. aView -> aView.setEnabled(false)
  18. )
  19. .subscribeOn(AndroidSchedulers.mainThread()) // RxView expects to be subscribed on the Main Thread
  20. .unsubscribeOn(AndroidSchedulers.mainThread());
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.just(1).unsubscribeOn(Schedulers.single()));
  4. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. @Override
  2. public Observable<ScanResult> call() {
  3. scanPreconditionVerifier.verify();
  4. final ScanSetup scanSetup = scanSetupBuilder.build(scanSettings, scanFilters);
  5. final Operation<RxBleInternalScanResult> scanOperation = scanSetup.scanOperation;
  6. return operationQueue.queue(scanOperation)
  7. .unsubscribeOn(bluetoothInteractionScheduler)
  8. .compose(scanSetup.scanOperationBehaviourEmulatorTransformer)
  9. .map(internalToExternalScanResultMapFunction)
  10. .mergeWith(RxBleClientImpl.this.<ScanResult>bluetoothAdapterOffExceptionObservable());
  11. }
  12. });

代码示例来源:origin: Polidea/RxAndroidBle

  1. .unsubscribeOn(subscribeScheduler)
  2. .subscribe(new Observer<T>() {
  3. @Override

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normal() {
  3. final int[] calls = { 0 };
  4. Observable.just(1)
  5. .doOnDispose(new Action() {
  6. @Override
  7. public void run() throws Exception {
  8. calls[0]++;
  9. }
  10. })
  11. .unsubscribeOn(Schedulers.single())
  12. .test()
  13. .assertResult(1);
  14. assertEquals(0, calls[0]);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. .unsubscribeOn(uiEventLoop)
  2. .take(2)
  3. .subscribe(observer);

代码示例来源:origin: ReactiveX/RxJava

  1. .unsubscribeOn(uiEventLoop)
  2. .take(2)
  3. .subscribe(observer);

代码示例来源:origin: Polidea/RxAndroidBle

  1. @Override
  2. public ObservableSource<RxBleConnection> call() throws Exception {
  3. final ConnectionComponent connectionComponent = connectionComponentBuilder
  4. .connectionModule(new ConnectionModule(options))
  5. .build();
  6. final Set<ConnectionSubscriptionWatcher> connSubWatchers = connectionComponent.connectionSubscriptionWatchers();
  7. return obtainRxBleConnection(connectionComponent)
  8. .mergeWith(observeDisconnections(connectionComponent))
  9. .delaySubscription(enqueueConnectOperation(connectionComponent))
  10. .doOnSubscribe(new Consumer<Disposable>() {
  11. @Override
  12. public void accept(Disposable disposable) throws Exception {
  13. for (ConnectionSubscriptionWatcher csa : connSubWatchers) {
  14. csa.onConnectionSubscribed();
  15. }
  16. }
  17. })
  18. .doFinally(new Action() {
  19. @Override
  20. public void run() throws Exception {
  21. for (ConnectionSubscriptionWatcher csa : connSubWatchers) {
  22. csa.onConnectionUnsubscribed();
  23. }
  24. }
  25. })
  26. .subscribeOn(callbacksScheduler)
  27. .unsubscribeOn(callbacksScheduler);
  28. }
  29. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. final int[] calls = { 0 };
  4. Observable.error(new TestException())
  5. .doOnDispose(new Action() {
  6. @Override
  7. public void run() throws Exception {
  8. calls[0]++;
  9. }
  10. })
  11. .unsubscribeOn(Schedulers.single())
  12. .test()
  13. .assertFailure(TestException.class);
  14. assertEquals(0, calls[0]);
  15. }

代码示例来源:origin: forkachild/reel-search-android

  1. public static <U> ObservableTransformer<U, U> composeObservable() {
  2. return upstream -> upstream
  3. .subscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .unsubscribeOn(Schedulers.io());
  6. }

代码示例来源:origin: LRH1993/RetrofitRxJavaBox

  1. @Override
  2. public ObservableSource apply(Observable upstream) {
  3. return ((Observable) upstream).subscribeOn(Schedulers.io())
  4. .unsubscribeOn(Schedulers.io())
  5. .observeOn(AndroidSchedulers.mainThread());
  6. }
  7. };

代码示例来源:origin: LRH1993/LiveCircle

  1. @Override
  2. public ObservableSource apply(Observable upstream) {
  3. return ((Observable) upstream).subscribeOn(Schedulers.io())
  4. .unsubscribeOn(Schedulers.io())
  5. .observeOn(AndroidSchedulers.mainThread());
  6. }
  7. };

代码示例来源:origin: Tophold/FinancialCustomerView

  1. public static Observable call(Observable<?> observable, Observer observer) {
  2. observable.subscribeOn(Schedulers.io())
  3. .unsubscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(observer);
  6. return observable;
  7. }
  8. }

代码示例来源:origin: onlyloveyd/JuheNews

  1. private void commonOp(Observable observable, Observer subscriber) {
  2. observable.subscribeOn(Schedulers.io())
  3. .unsubscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(subscriber);
  6. }
  7. //在访问HttpMethods时创建单例

代码示例来源:origin: leftcoding/GankLy

  1. public void downloadApk(Consumer<InputStream> next, Observer subscriber) {
  2. mDownloadService.downloadApk()
  3. .subscribeOn(Schedulers.io())
  4. .unsubscribeOn(Schedulers.io())
  5. .map(ResponseBody::byteStream)
  6. .observeOn(Schedulers.io())
  7. .doOnNext(next)
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .subscribe(subscriber);
  10. }
  11. }

代码示例来源:origin: huntermr/FastAndroid

  1. @Override
  2. public <T> void startAsync(Observable<T> observable, Observer<T> observer) {
  3. observable
  4. .subscribeOn(Schedulers.io())
  5. .unsubscribeOn(Schedulers.io())
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .compose(this.<T>bind())
  8. .subscribe(observer);
  9. }

代码示例来源:origin: wzmyyj/ZYMK

  1. public void getSmartSearch(final String key, Observer<SearchBox> observer) {
  2. Gson gson = new GsonBuilder().registerTypeAdapter(SearchBox.class, new SearchBox.Deserializer2()).create();
  3. Retrofit retrofit = ReOk.bind(Urls.ZYMK_BaseApi, gson);
  4. SearchService service = retrofit.create(SearchService.class);
  5. Observable<SearchBox> observable = service.getSmartSearch(key);
  6. observable.subscribeOn(Schedulers.io())
  7. .unsubscribeOn(Schedulers.io())
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .subscribe(observer);
  10. }
  11. }

代码示例来源:origin: wzmyyj/ZYMK

  1. public void getComic(int comic_id, Observer<ComicBox> observer) {
  2. Gson gson = new GsonBuilder().registerTypeAdapter(ComicBox.class, new ComicBox.Deserializer2()).create();
  3. Retrofit retrofit = ReOk.bind(Urls.ZYMK_BaseApi, gson);
  4. ComicService service = retrofit.create(ComicService.class);
  5. Observable<ComicBox> observable = service.getComic(comic_id);
  6. observable.subscribeOn(Schedulers.io())
  7. .unsubscribeOn(Schedulers.io())
  8. .observeOn(AndroidSchedulers.mainThread())
  9. .subscribe(observer);
  10. }

代码示例来源:origin: leftcoding/GankLy

  1. protected <T> Observable<T> toObservable(Observable<T> o) {
  2. return o.retry(3)
  3. .subscribeOn(Schedulers.computation())
  4. .unsubscribeOn(Schedulers.computation())
  5. .observeOn(AndroidSchedulers.mainThread());
  6. }
  7. }

相关文章

Observable类方法