rx.Observable.doOnUnsubscribe()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(178)

本文整理了Java中rx.Observable.doOnUnsubscribe()方法的一些代码示例,展示了Observable.doOnUnsubscribe()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnUnsubscribe()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:doOnUnsubscribe

Observable.doOnUnsubscribe介绍

[英]Modifies the source Observable so that it invokes the given action when it is unsubscribed from its subscribers. Each un-subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the very last un-subscription.

Scheduler: doOnUnsubscribe does not operate by default on a particular Scheduler.
[中]修改源Observable,以便在取消订阅时调用给定的操作。每个un订阅都会导致调用给定的操作,除非源可观测对象被引用计数,在这种情况下,源可观测对象将为最后一个un订阅调用给定的操作。
调度器:默认情况下,DoUnSubscribe不会在特定的调度器上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {
  if (arg == RequestCollapser.NULL_SENTINEL) {
    this.argument = null;
  } else {
    this.argument = arg;
  }
  this.subjectWithAccounting = subject
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          outstandingSubscriptions++;
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          outstandingSubscriptions--;
          if (outstandingSubscriptions == 0) {
            containingBatch.remove(arg);
          }
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
  final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
  return cachedObservable
      .doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandCompleted(commandToCopyStateInto);
          }
        }
      })
      .doOnCompleted(new Action0() {
        @Override
        public void call() {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandCompleted(commandToCopyStateInto);
          }
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          if (completionLogicRun.compareAndSet(false, true)) {
            commandUnsubscribed(commandToCopyStateInto);
          }
        }
      });
}

代码示例来源:origin: bluelinelabs/Conductor

@NonNull
@Override
protected View onCreateView(@NonNull LayoutInflater inflater, @NonNull ViewGroup container) {
  Log.i(TAG, "onCreateView() called");
  View view = inflater.inflate(R.layout.controller_lifecycle, container, false);
  view.setBackgroundColor(ContextCompat.getColor(container.getContext(), R.color.red_300));
  unbinder = ButterKnife.bind(this, view);
  tvTitle.setText(getResources().getString(R.string.rxlifecycle_title, TAG));
  Observable.interval(1, TimeUnit.SECONDS)
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          Log.i(TAG, "Unsubscribing from onCreateView)");
        }
      })
      .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY_VIEW))
      .subscribe(new Action1<Long>() {
        @Override
        public void call(Long num) {
          Log.i(TAG, "Started in onCreateView(), running until onDestroyView(): " + num);
        }
      });
  return view;
}

代码示例来源:origin: bluelinelabs/Conductor

public RxLifecycleController() {
  Observable.interval(1, TimeUnit.SECONDS)
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          Log.i(TAG, "Unsubscribing from constructor");
        }
      })
      .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY))
      .subscribe(new Action1<Long>() {
        @Override
        public void call(Long num) {
          Log.i(TAG, "Started in constructor, running until onDestroy(): " + num);
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllUtilization)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllConfig)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
  try {
    executionHook.onCacheHit(this);
  } catch (Throwable hookEx) {
    logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
  }
  return fromCache.toObservableWithStateCopiedInto(this)
      .doOnTerminate(new Action0() {
        @Override
        public void call() {
          if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
            cleanUpAfterResponseFromCache(false); //user code never ran
          } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
            cleanUpAfterResponseFromCache(true); //user code did run
          }
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
            cleanUpAfterResponseFromCache(false); //user code never ran
          } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
            cleanUpAfterResponseFromCache(true); //user code did run
          }
        }
      });
}

代码示例来源:origin: konmik/nucleus

.doOnUnsubscribe(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

private HystrixDashboardStream(int delayInMs) {
  this.delayInMs = delayInMs;
  this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
      .map(new Func1<Long, DashboardData>() {
        @Override
        public DashboardData call(Long timestamp) {
          return new DashboardData(
              HystrixCommandMetrics.getInstances(),
              HystrixThreadPoolMetrics.getInstances(),
              HystrixCollapserMetrics.getInstances()
          );
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                    final Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
      return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
  };
  this.sourceStream = bucketedStream      //stream broken up into buckets
      .window(numBuckets, 1)          //emit overlapping windows of buckets
      .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: bluelinelabs/Conductor

@Override
protected void onAttach(@NonNull View view) {
  super.onAttach(view);
  Log.i(TAG, "onAttach() called");
  (((ActionBarProvider)getActivity()).getSupportActionBar()).setTitle("RxLifecycle Demo");
  Observable.interval(1, TimeUnit.SECONDS)
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          Log.i(TAG, "Unsubscribing from onAttach()");
        }
      })
      .compose(this.<Long>bindUntilEvent(ControllerEvent.DETACH))
      .subscribe(new Action1<Long>() {
        @Override
        public void call(Long num) {
          Log.i(TAG, "Started in onAttach(), running until onDetach(): " + num);
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

doOnUnsubscribe(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

.doOnError(markExceptionThrown)
      .doOnTerminate(singleSemaphoreRelease)
      .doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
  return Observable.error(e);

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: PipelineAI/pipeline

.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);

代码示例来源:origin: PipelineAI/pipeline

doOnUnsubscribe(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

doOnUnsubscribe(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: PipelineAI/pipeline

doOnUnsubscribe(new Action0() {
  @Override
  public void call() {

代码示例来源:origin: Netflix/zuul

filter.applyAsync(inMesg)
  .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
  .doOnUnsubscribe(resumer::decrementConcurrency)
  .subscribe(resumer);

代码示例来源:origin: Netflix/zuul

filter.applyAsync(inMesg)
  .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
  .doOnUnsubscribe(resumer::decrementConcurrency)
  .subscribe(resumer);

相关文章

Observable类方法