rx.subjects.Subject.observeOn()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(4.0k)|赞(0)|评价(0)|浏览(116)

本文整理了Java中rx.subjects.Subject.observeOn()方法的一些代码示例,展示了Subject.observeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.observeOn()方法的具体详情如下:
包路径:rx.subjects.Subject
类名称:Subject
方法名:observeOn

Subject.observeOn介绍

暂无

代码示例

代码示例来源:origin: wanliyang1990/WliveTV

/**
 * 子线程中执行
 * @param tag
 * @param rxBusResult
 */
public void toObserverableChildThread(final String tag, final RxBusResult rxBusResult) {
  _bus.observeOn(Schedulers.io()).subscribe(new Action1<Object>() {
    @Override
    public void call(Object o) {
      if (tags.containsKey(tag)) {
        rxBusResult.onRxBusResult(o);
      }
    }
  });
}

代码示例来源:origin: wanliyang1990/WliveTV

/**
 * 主线程中执行
 * @param tag
 * @param rxBusResult
 */
public void toObserverableOnMainThread(final String tag, final RxBusResult rxBusResult) {
    _bus.observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Object>() {
      @Override
      public void call(Object o) {
        if (tags.containsKey(tag)) {
          rxBusResult.onRxBusResult(o);
        }
      }
    });
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Creates a subject whose client observers will observe events
 * propagated through the given wrapped subject.
 * @param <T> the element type
 * @param subject the subject to wrap
 * @param scheduler the target scheduler
 * @return the created subject
 */
public static <T> Subject<T, T> createScheduledSubject(Subject<T, T> subject, Scheduler scheduler) {
  final Observable<T> observedOn = subject.observeOn(scheduler);
  SubjectWrapper<T> s = new SubjectWrapper<T>(new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> o) {
      subscriberOf(observedOn).call(o);
    }
  }, subject);
  return s;
}

代码示例来源:origin: couchbase/couchbase-jvm-core

@Override
@SuppressWarnings("unchecked")
public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest request) {
  if (request instanceof InternalRequest) {
    handleInternalRequest(request);
    return (Observable<R>) request.observable().observeOn(environment.scheduler());
  } else if (request instanceof ClusterRequest) {
    handleClusterRequest(request);
    return (Observable<R>) request.observable().observeOn(environment.scheduler());
  } else {
    RingBufferMonitor ringBufferMonitor = RingBufferMonitor.instance();
    ringBufferMonitor.addRequest(request);
    if (coreSendHook == null) {
      boolean published = requestRingBuffer.tryPublishEvent(REQUEST_TRANSLATOR, request);
      if (!published) {
        request.observable().onError(ringBufferMonitor.createException());
      }
      return (Observable<R>) request.observable();
    } else {
      Subject<CouchbaseResponse, CouchbaseResponse> response = request.observable();
      Tuple2<CouchbaseRequest, Observable<CouchbaseResponse>> hook = coreSendHook
          .beforeSend(request, response);
      boolean published = requestRingBuffer.tryPublishEvent(REQUEST_TRANSLATOR, hook.value1());
      if (!published) {
        response.onError(ringBufferMonitor.createException());
      }
      return (Observable<R>) hook.value2();
    }
  }
}

代码示例来源:origin: com.couchbase.client/core-io

@Override
@SuppressWarnings("unchecked")
public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest request) {
  if (request instanceof InternalRequest) {
    handleInternalRequest(request);
    return (Observable<R>) request.observable().observeOn(environment.scheduler());
  } else if (request instanceof ClusterRequest) {
    handleClusterRequest(request);
    return (Observable<R>) request.observable().observeOn(environment.scheduler());
  } else {
    RingBufferMonitor ringBufferMonitor = RingBufferMonitor.instance();
    ringBufferMonitor.addRequest(request);
    if (coreSendHook == null) {
      boolean published = requestRingBuffer.tryPublishEvent(REQUEST_TRANSLATOR, request);
      if (!published) {
        request.observable().onError(ringBufferMonitor.createException());
      }
      return (Observable<R>) request.observable();
    } else {
      Subject<CouchbaseResponse, CouchbaseResponse> response = request.observable();
      Tuple2<CouchbaseRequest, Observable<CouchbaseResponse>> hook = coreSendHook
          .beforeSend(request, response);
      boolean published = requestRingBuffer.tryPublishEvent(REQUEST_TRANSLATOR, hook.value1());
      if (!published) {
        response.onError(ringBufferMonitor.createException());
      }
      return (Observable<R>) hook.value2();
    }
  }
}

相关文章