rx.Observable.doOnUnsubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(210)

本文整理了Java中rx.Observable.doOnUnsubscribe()方法的一些代码示例,展示了Observable.doOnUnsubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnUnsubscribe()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnUnsubscribe

Observable.doOnUnsubscribe介绍

[英]Modifies the source Observable so that it invokes the given action when it is unsubscribed from its subscribers. Each un-subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the very last un-subscription.

Scheduler: doOnUnsubscribe does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在取消订阅时调用给定的操作。每个un订阅都会导致调用给定的操作,除非源可观测对象被引用计数,在这种情况下,源可观测对象将为最后一个un订阅调用给定的操作。
调度器:默认情况下,DoUnSubscribe不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {
  2. if (arg == RequestCollapser.NULL_SENTINEL) {
  3. this.argument = null;
  4. } else {
  5. this.argument = arg;
  6. }
  7. this.subjectWithAccounting = subject
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. outstandingSubscriptions++;
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. outstandingSubscriptions--;
  18. if (outstandingSubscriptions == 0) {
  19. containingBatch.remove(arg);
  20. }
  21. }
  22. });
  23. }

代码示例来源:origin: PipelineAI/pipeline

  1. public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
  2. final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
  3. return cachedObservable
  4. .doOnError(new Action1<Throwable>() {
  5. @Override
  6. public void call(Throwable throwable) {
  7. if (completionLogicRun.compareAndSet(false, true)) {
  8. commandCompleted(commandToCopyStateInto);
  9. }
  10. }
  11. })
  12. .doOnCompleted(new Action0() {
  13. @Override
  14. public void call() {
  15. if (completionLogicRun.compareAndSet(false, true)) {
  16. commandCompleted(commandToCopyStateInto);
  17. }
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. if (completionLogicRun.compareAndSet(false, true)) {
  24. commandUnsubscribed(commandToCopyStateInto);
  25. }
  26. }
  27. });
  28. }

代码示例来源:origin: bluelinelabs/Conductor

  1. @NonNull
  2. @Override
  3. protected View onCreateView(@NonNull LayoutInflater inflater, @NonNull ViewGroup container) {
  4. Log.i(TAG, "onCreateView() called");
  5. View view = inflater.inflate(R.layout.controller_lifecycle, container, false);
  6. view.setBackgroundColor(ContextCompat.getColor(container.getContext(), R.color.red_300));
  7. unbinder = ButterKnife.bind(this, view);
  8. tvTitle.setText(getResources().getString(R.string.rxlifecycle_title, TAG));
  9. Observable.interval(1, TimeUnit.SECONDS)
  10. .doOnUnsubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. Log.i(TAG, "Unsubscribing from onCreateView)");
  14. }
  15. })
  16. .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY_VIEW))
  17. .subscribe(new Action1<Long>() {
  18. @Override
  19. public void call(Long num) {
  20. Log.i(TAG, "Started in onCreateView(), running until onDestroyView(): " + num);
  21. }
  22. });
  23. return view;
  24. }

代码示例来源:origin: bluelinelabs/Conductor

  1. public RxLifecycleController() {
  2. Observable.interval(1, TimeUnit.SECONDS)
  3. .doOnUnsubscribe(new Action0() {
  4. @Override
  5. public void call() {
  6. Log.i(TAG, "Unsubscribing from constructor");
  7. }
  8. })
  9. .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY))
  10. .subscribe(new Action1<Long>() {
  11. @Override
  12. public void call(Long num) {
  13. Log.i(TAG, "Started in constructor, running until onDestroy(): " + num);
  14. }
  15. });
  16. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixUtilizationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllUtilization)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixConfigurationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllConfig)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
  2. try {
  3. executionHook.onCacheHit(this);
  4. } catch (Throwable hookEx) {
  5. logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
  6. }
  7. return fromCache.toObservableWithStateCopiedInto(this)
  8. .doOnTerminate(new Action0() {
  9. @Override
  10. public void call() {
  11. if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
  12. cleanUpAfterResponseFromCache(false); //user code never ran
  13. } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
  14. cleanUpAfterResponseFromCache(true); //user code did run
  15. }
  16. }
  17. })
  18. .doOnUnsubscribe(new Action0() {
  19. @Override
  20. public void call() {
  21. if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
  22. cleanUpAfterResponseFromCache(false); //user code never ran
  23. } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
  24. cleanUpAfterResponseFromCache(true); //user code did run
  25. }
  26. }
  27. });
  28. }

代码示例来源:origin: konmik/nucleus

  1. .doOnUnsubscribe(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. private HystrixDashboardStream(int delayInMs) {
  2. this.delayInMs = delayInMs;
  3. this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
  4. .map(new Func1<Long, DashboardData>() {
  5. @Override
  6. public DashboardData call(Long timestamp) {
  7. return new DashboardData(
  8. HystrixCommandMetrics.getInstances(),
  9. HystrixThreadPoolMetrics.getInstances(),
  10. HystrixCollapserMetrics.getInstances()
  11. );
  12. }
  13. })
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share()
  27. .onBackpressureDrop();
  28. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
  2. final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
  3. final Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  5. Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
  6. @Override
  7. public Observable<Output> call(Observable<Bucket> window) {
  8. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  9. }
  10. };
  11. this.sourceStream = bucketedStream //stream broken up into buckets
  12. .window(numBuckets, 1) //emit overlapping windows of buckets
  13. .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share() //multiple subscribers should get same data
  27. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  28. }

代码示例来源:origin: bluelinelabs/Conductor

  1. @Override
  2. protected void onAttach(@NonNull View view) {
  3. super.onAttach(view);
  4. Log.i(TAG, "onAttach() called");
  5. (((ActionBarProvider)getActivity()).getSupportActionBar()).setTitle("RxLifecycle Demo");
  6. Observable.interval(1, TimeUnit.SECONDS)
  7. .doOnUnsubscribe(new Action0() {
  8. @Override
  9. public void call() {
  10. Log.i(TAG, "Unsubscribing from onAttach()");
  11. }
  12. })
  13. .compose(this.<Long>bindUntilEvent(ControllerEvent.DETACH))
  14. .subscribe(new Action1<Long>() {
  15. @Override
  16. public void call(Long num) {
  17. Log.i(TAG, "Started in onAttach(), running until onDetach(): " + num);
  18. }
  19. });
  20. }

代码示例来源:origin: PipelineAI/pipeline

  1. doOnUnsubscribe(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnError(markExceptionThrown)
  2. .doOnTerminate(singleSemaphoreRelease)
  3. .doOnUnsubscribe(singleSemaphoreRelease);
  4. } catch (RuntimeException e) {
  5. return Observable.error(e);

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
  2. Func2<Bucket, Event, Bucket> reduceCommandCompletion,
  3. Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  5. this.sourceStream = bucketedStream
  6. .scan(getEmptyOutputValue(), reduceBucket)
  7. .skip(numBuckets)
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. isSourceCurrentlySubscribed.set(true);
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(false);
  18. }
  19. })
  20. .share() //multiple subscribers should get same data
  21. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  22. }

代码示例来源:origin: PipelineAI/pipeline

  1. .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
  2. .doOnCompleted(fireOnCompletedHook);

代码示例来源:origin: PipelineAI/pipeline

  1. doOnUnsubscribe(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. doOnUnsubscribe(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: PipelineAI/pipeline

  1. doOnUnsubscribe(new Action0() {
  2. @Override
  3. public void call() {

代码示例来源:origin: Netflix/zuul

  1. filter.applyAsync(inMesg)
  2. .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
  3. .doOnUnsubscribe(resumer::decrementConcurrency)
  4. .subscribe(resumer);

代码示例来源:origin: Netflix/zuul

  1. filter.applyAsync(inMesg)
  2. .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
  3. .doOnUnsubscribe(resumer::decrementConcurrency)
  4. .subscribe(resumer);

相关文章

Observable类方法