rx.Observable.timeout()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(186)

本文整理了Java中rx.Observable.timeout()方法的一些代码示例,展示了Observable.timeout()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.timeout()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:timeout

Observable.timeout介绍

[英]Returns an Observable that mirrors the source Observable but applies a timeout policy for each emitted item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, the resulting Observable terminates and notifies observers of a TimeoutException.

Scheduler: This version of timeout operates by default on the computation Scheduler.
[中]返回一个Observable,该Observable镜像源Observable,但为每个发出的项应用超时策略。如果下一个项目没有在从其前一个项目开始的指定超时持续时间内发出,则生成的Observable将终止并通知观察者TimeoutException。
调度程序:此版本的超时默认在计算调度程序上运行。

代码示例

代码示例来源:origin: Netflix/servo

final CountDownLatch completed = new CountDownLatch(1);
final Subscription s = Observable.mergeDelayError(Observable.from(batches))
  .timeout(timeoutMillis, TimeUnit.MILLISECONDS)
  .subscribeOn(Schedulers.immediate())
  .subscribe(updated::addAndGet, exc -> {

代码示例来源:origin: apache/incubator-gobblin

if (callback == null) {
 return new WriteResponseFuture<>(
   observable.timeout(_operationTimeout, _operationTimeunit).toBlocking().toFuture(),
   _defaultWriteResponseMapper);
} else {
 observable.timeout(_operationTimeout, _operationTimeunit)
   .subscribe(new Subscriber<D>() {
    @Override

代码示例来源:origin: THEONE10211024/RxJavaSamples

private Observable<String> _getObservableTask_2sToComplete() {
  return Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
      _log(String.format("Starting a 2s task"));
      subscriber.onNext("2 s");
      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      subscriber.onCompleted();
    }
  }).subscribeOn(Schedulers.computation()).timeout(3, TimeUnit.SECONDS);
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@OnClick(R.id.btn_demo_timeout_1_5s)
public void onStart5sTask() {
  _subscription = _getObservableFor5sTask()//
     .timeout(2, TimeUnit.SECONDS, _getTimeoutObservable())
     .subscribeOn(Schedulers.computation())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(_getEventCompletionObserver());
}

代码示例来源:origin: Q42/RxPromise

/**
 * Returns a promise that mirrors the source promise but applies a timeout policy.
 * If the promise isn't completed within the specified timeout duration,
 * the resulting promise is rejected with a {@code TimeoutException}.
 */
public Promise<T> timeout(long timeout, TimeUnit timeUnit) {
  return new Promise<T>(observable.timeout(timeout, timeUnit));
}

代码示例来源:origin: meltwater/rxrabbit

@Override
public Observable<Message> call(Observable<Message> input) {
  final AtomicLong consumedCount = new AtomicLong(0);
  return input
      .doOnNext(message -> message.acknowledger.ack())
      .timeout(timeout, timeUnit, just(STOP))
      .takeUntil(message -> message == STOP || consumedEnough(consumedCount.incrementAndGet()) )
      .filter(message -> message != STOP);
}

代码示例来源:origin: MrFuFuFu/RxFace

public Observable<FaceResponse> getDataPost(CustomMultipartTypedOutput listMultipartOutput) {
  return mWebService.uploadImagePost(listMultipartOutput)
      .timeout(Constants.TIME_OUT, TimeUnit.MILLISECONDS)
      .concatMap(new Func1<FaceResponse, Observable<FaceResponse>>() {
        @Override
        public Observable<FaceResponse> call(FaceResponse faceResponse) {
          return faceResponse.filterWebServiceErrors();
        }
      }).compose(SchedulersCompat.<FaceResponse>applyExecutorSchedulers());
}

代码示例来源:origin: MrFuFuFu/RxFace

public Observable<FaceResponse> getDataUrlGet(Map<String, String> options) {
    return mWebService.uploadImageUrlGet(options)
        .timeout(Constants.TIME_OUT, TimeUnit.MILLISECONDS)
        .concatMap(new Func1<FaceResponse, Observable<FaceResponse>>() {
          @Override
          public Observable<FaceResponse> call(FaceResponse faceResponse) {
            return faceResponse.filterWebServiceErrors();
          }
        }).compose(SchedulersCompat.<FaceResponse>applyExecutorSchedulers());//http://www.jianshu.com/p/e9e03194199e
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_67() throws Exception {
  rxLookupFlight("LOT 783")
      .subscribeOn(Schedulers.io())
      .timeout(100, TimeUnit.MILLISECONDS);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Override
  public Observable<LocalDate> externalCall() {
    return delegate
        .externalCall()
        .timeout(1, TimeUnit.SECONDS,
           Observable.empty(),
           scheduler);
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_281() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .doOnError(th -> log.warn("Will retry", th))
      .retry()
      .subscribe(log::info);
}

代码示例来源:origin: eventuate-clients/eventuate-client-java

public void assertClientIsDisconnected() {
  rx.Observable.interval(50, TimeUnit.MILLISECONDS)
      .filter(ignore -> isClientDisconnected())
      .take(1).timeout(300, TimeUnit.MILLISECONDS).toBlocking().first();
 }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_249() throws Exception {
  Observable
      .interval(99, MILLISECONDS)
      .debounce(100, MILLISECONDS)
      .timeout(1, SECONDS);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_303() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retry(10);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_72() throws Exception {
  final Observable<Long> upstream = Observable.interval(99, MILLISECONDS);
  upstream
      .debounce(100, MILLISECONDS)
      .timeout(1, SECONDS, upstream
          .take(1)
          .concatWith(
              upstream
                  .debounce(100, MILLISECONDS)
                  .timeout(1, SECONDS, upstream)));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_74() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retryWhen(failures -> failures
          .zipWith(Observable.range(1, ATTEMPTS), (err, attempt) ->
              attempt < ATTEMPTS ?
                  Observable.timer(1, SECONDS) :
                  Observable.error(err))
          .flatMap(x -> x)
      );
}

代码示例来源:origin: eventuate-clients/eventuate-client-java

public void assertSubscribed() {
 frames.timeout(30, TimeUnit.SECONDS)
     .filter(frame -> frame.frame().getCommand().equals(Frame.Command.SUBSCRIBE)).take(1).timeout(720, TimeUnit.SECONDS)
     .toBlocking().first();
}
public void assertAcked(int count) {

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_89() throws Exception {
  risky()
      .timeout(1, SECONDS)
      .retryWhen(failures -> failures
          .zipWith(Observable.range(1, ATTEMPTS),
              this::handleRetryAttempt)
          .flatMap(x -> x)
      );
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
  public void sample_66() throws Exception {
    risky()
        .timeout(1, SECONDS)
//                .retryWhen(failures -> failures.take(10))
        .retryWhen(failures -> failures.delay(1, SECONDS));
  }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_48() throws Exception {
  ConnectableObservable<Long> upstream = Observable
      .interval(99, MILLISECONDS)
      .publish();
  upstream
      .debounce(100, MILLISECONDS)
      .timeout(1, SECONDS, upstream.take(1));
  upstream.connect();
}

相关文章

Observable类方法