rx.Observable.takeUntil()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(230)

本文整理了Java中rx.Observable.takeUntil()方法的一些代码示例,展示了Observable.takeUntil()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeUntil()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:takeUntil

Observable.takeUntil介绍

[英]Returns an Observable that emits the items emitted by the source Observable until a second Observable emits an item.

Scheduler: takeUntil does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的项,直到第二个Observable发出一个项为止。
调度程序:默认情况下,TakeTill不会在特定调度程序上运行。

代码示例

代码示例来源:origin: BaronZ88/MinimalistWeather

.filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId()))
.distinct(weather -> weather.getWeatherLive().getTime())
.takeUntil(weather -> !refreshNow && System.currentTimeMillis() - weather.getWeatherLive().getTime() <= 15 * 60 * 1000);

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.range(1, 10).takeUntil(new Func1<Integer, Boolean>() {
   @Override
   public Boolean call(Integer i) {
    return i > 3;
   }
  }).subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  });
 }
});

代码示例来源:origin: meltwater/rxrabbit

@Override
public Observable<Message> call(Observable<Message> input) {
  final AtomicLong consumedCount = new AtomicLong(0);
  return input
      .doOnNext(message -> message.acknowledger.ack())
      .timeout(timeout, timeUnit, just(STOP))
      .takeUntil(message -> message == STOP || consumedEnough(consumedCount.incrementAndGet()) )
      .filter(message -> message != STOP);
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Observable<Download> getDownload(String md5) {
 return downloadsRepository.getDownload(md5)
   .flatMap(download -> {
    if (download == null || isFileMissingFromCompletedDownload(download)) {
     return Observable.error(new DownloadNotFoundException());
    } else {
     return Observable.just(download);
    }
   })
   .takeUntil(
     storedDownload -> storedDownload.getOverallDownloadStatus() == Download.COMPLETED);
}

代码示例来源:origin: codeabovelab/haven-platform

public void subscribeToChanges(Action1<ResultCode> action, int checkIntervalInSec) {
  monitor.subscribeOn(Schedulers.newThread()).subscribe(action);
  Observable.interval(checkIntervalInSec, TimeUnit.SECONDS).takeUntil(t -> monitor.hasCompleted()).subscribe(observer);
}

代码示例来源:origin: org.zalando.paradox/paradox-nakadi-consumer-core

private <T> Observable.Transformer<T, Long> zipWithFlatMap(final String reason) {
  return
    observable ->
      observable.zipWith(
          Observable.range(1, Integer.MAX_VALUE), (t, repeatAttempt) -> {
            // Void or Throwable
            if (t instanceof Throwable) {
              log.warn("Exception [{}]", getMessage((Throwable)t));
            }
            return repeatAttempt;
          }).flatMap(repeatAttempt -> {
            final long retryAfterMillis = httpReactiveHandler.getRetryAfterMillis();
            checkArgument(retryAfterMillis > 0, "RetryAfterMillis must be greater than 0");
            log.debug("Restart after [{}] ms running [{}] reason [{}] attempt : [{}]", retryAfterMillis,
                running.get(), reason, repeatAttempt);
            return Observable.timer(retryAfterMillis, TimeUnit.MILLISECONDS);
           }).takeUntil((stopPredicate) -> !running.get());
}
//J+

代码示例来源:origin: hawkular/hawkular-metrics

public void run() {
  logger.info("Checking for expired temp tables");
  Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
      .takeUntil(i -> finished)
      .flatMap(i -> session.execute(findTables.bind()))
      .compose(applyRetryPolicy())
      .flatMap(Observable::from)
      .filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
      .map(row -> row.getString(0))
      .filter(this::isTableExpired)
      .flatMap(this::dropTable)
      .subscribe(
          table -> logger.infof("Dropped table %s", table),
          t -> logger.warn("Cleaning temp tables failed", t),
          () -> logger.infof("Finished cleaning expired temp tables")
      );
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

public void run() {
  logger.info("Checking for expired temp tables");
  Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
      .takeUntil(i -> finished)
      .flatMap(i -> session.execute(findTables.bind()))
      .compose(applyRetryPolicy())
      .flatMap(Observable::from)
      .filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
      .map(row -> row.getString(0))
      .filter(this::isTableExpired)
      .flatMap(this::dropTable)
      .subscribe(
          table -> logger.infof("Dropped table %s", table),
          t -> logger.warn("Cleaning temp tables failed", t),
          () -> logger.infof("Finished cleaning expired temp tables")
      );
}

代码示例来源:origin: hawkular/hawkular-metrics

private void doOnTick(Action0 action) {
  Action0 wrapper = () -> {
    Date timeSlice = getTimeSlice(new DateTime(tickScheduler.now()), minutes(1).toStandardDuration()).toDate();
    action.call();
  };
  AtomicReference<DateTime> previousTimeSliceRef = new AtomicReference<>();
  // TODO Emit ticks at the start of every minute
  Observable.interval(0, 1, TimeUnit.MINUTES, tickScheduler)
      .filter(tick -> {
        DateTime time = currentMinute();
        if (previousTimeSliceRef.get() == null) {
          previousTimeSliceRef.set(time);
          return true;
        }
        if (previousTimeSliceRef.get().equals(time)) {
          return false;
        }
        previousTimeSliceRef.set(time);
        return true;
      })
      .takeUntil(d -> !running)
      .subscribe(tick -> wrapper.call(), t -> logger.warn(t));
}

代码示例来源:origin: Aptoide/aptoide-client-v8

/**
 * Observe changes to a download. This observable never completes it will emmit items whenever
 * the download state changes.
 *
 * @return observable for download state changes.
 */
public Observable<Download> getDownload(String md5) {
 return downloadAccessor.get(md5)
   .flatMap(download -> {
    if (download == null || (download.getOverallDownloadStatus() == Download.COMPLETED
      && getStateIfFileExists(download) == Download.FILE_MISSING)) {
     return Observable.error(new DownloadNotFoundException());
    } else {
     return Observable.just(download);
    }
   })
   .takeUntil(
     storedDownload -> storedDownload.getOverallDownloadStatus() == Download.COMPLETED);
}

代码示例来源:origin: com.trunk.rx.json/rxjava-json-core

@Override
public Observable<JsonPathEvent> call(Observable<JsonTokenEvent> upstream) {
 ConcurrentHashMap<JsonPath, Integer> visitedMatchers = new ConcurrentHashMap<>();
 ConcurrentHashMap<JsonPath, Integer> completedMatchers = new ConcurrentHashMap<>();
 matchers.forEach(jsonPath -> {
  visitedMatchers.put(jsonPath, 0);
  completedMatchers.put(jsonPath, 0);
 });
 return upstream
  .takeUntil(ignore -> !lenient && allMatchersComplete(visitedMatchers, completedMatchers))
  .concatMap( // order is important
   jsonTokenEvent ->
    matches(jsonTokenEvent, visitedMatchers, completedMatchers)
     .concatWith(
      jsonTokenEvent.getToken() == JsonDocumentEnd.instance() ?
       Observable.just(new JsonPathEvent(NoopToken.instance(), jsonTokenEvent)) :
       Observable.empty()
     )
  );
}

代码示例来源:origin: techery/janet

private <A> Observable<ActionState<A>> send(final A action) {
  return pipeline.asObservable()
      .filter(new Func1<ActionPair, Boolean>() {
        @Override public Boolean call(ActionPair pair) {
          return pair.holder.isOrigin(action);
        }
      })
      .map(new Func1<ActionPair, ActionState>() {
        @Override public ActionState call(ActionPair pair) {
          return pair.state;
        }
      })
      .compose(new CastToState<A>())
      .mergeWith(Observable.<ActionState<A>>empty()
          .doOnSubscribe(new Action0() {
            @Override public void call() {
              doSend(action);
            }
          }))
      .takeUntil(new Func1<ActionState, Boolean>() {
        @Override public Boolean call(ActionState actionState) {
          return actionState.status == ActionState.Status.SUCCESS
              || actionState.status == ActionState.Status.FAIL;
        }
      });
}

代码示例来源:origin: org.jboss.hal/hal-dmr

/**
 * Executes the composite operation until the operation successfully returns and the precondition is met.
 * The precondition receives the composite result of the operation.
 */
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
    @Nullable Predicate<CompositeResult> until) {
  logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
  Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
      (op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
      (op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
  if (until == null) {
    until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
  }
  return Observable
      .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
      .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
      .flatMapSingle(n -> execution, false, 1)
      .takeUntil(until::test) // until succeeded
      .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}

代码示例来源:origin: Aptoide/aptoide-client-v8

private Observable<Download> handleDownloadProgress(AppDownloader appDownloader) {
 return appDownloader.observeDownloadProgress()
   .flatMap(appDownloadStatus -> downloadsRepository.getDownload(appDownloadStatus.getMd5())
     .first()
     .flatMap(download -> updateDownload(download, appDownloadStatus)))
   .filter(download -> download.getOverallDownloadStatus() == Download.COMPLETED)
   .doOnNext(download -> removeAppDownloader(download.getMd5()))
   .doOnNext(download -> downloadAnalytics.onDownloadComplete(download.getMd5(),
     download.getPackageName(), download.getVersionCode()))
   .takeUntil(download -> download.getOverallDownloadStatus() == Download.COMPLETED);
}

代码示例来源:origin: akarnokd/akarnokd-misc

static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
    return f ->
      f.publish(g ->
        g.take(1)
        .concatWith(
          g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
          .take(1)
          .ignoreElements()
        )
        .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
      )
      ;
  }
}

代码示例来源:origin: org.jboss.hal/hal-dmr

/**
 * Executes the operation until the operation successfully returns and the precondition is met. The precondition
 * receives the result of the operation.
 */
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
    @Nullable Predicate<ModelNode> until) {
  logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
  Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
      (op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
      (op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
  if (until == null) {
    until = r -> !r.isFailure(); // default: until success
  }
  return Observable
      .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
      .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
      .flatMapSingle(n -> execution, false, 1)
      .takeUntil(until::test) // until succeeded
      .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}

代码示例来源:origin: Aptoide/aptoide-client-v8

private Observable<FileDownloadCallback> handleFileDownloadProgress(
  FileDownloader fileDownloader) {
 return fileDownloader.observeFileDownloadProgress()
   .takeUntil(fileDownloadCallback -> fileDownloadCallback.getDownloadState()
     == AppDownloadStatus.AppDownloadState.ERROR_FILE_NOT_FOUND)
   .flatMap(fileDownloadCallback -> {
    if (fileDownloadCallback.getDownloadState()
      == AppDownloadStatus.AppDownloadState.ERROR_FILE_NOT_FOUND) {
     Logger.getInstance()
       .d(TAG, "File not found error, restarting the download with the alternative link");
     FileDownloader retryFileDownloader =
       fileDownloaderProvider.createFileDownloader(md5, alternativeDownloadPath, fileType,
         packageName, versionCode, fileName, PublishSubject.create());
     this.fileDownloader = retryFileDownloader;
     return retryFileDownloader.startFileDownload()
       .andThen(handleFileDownloadProgress(retryFileDownloader));
    } else {
     return Observable.just(fileDownloadCallback);
    }
   })
   .doOnNext(fileDownloadCallback -> retryFileDownloadSubject.onNext(fileDownloadCallback));
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static <T, R> Observable.Transformer<T, R> switchFlatMap(
      int n, Func1<T, Observable<R>> mapper) {
    return f ->
      Observable.defer(() -> {
        final AtomicInteger ingress = new AtomicInteger();
        final Subject<Integer, Integer> cancel =
            PublishSubject.<Integer>create().toSerialized();
        return f.flatMap(v -> {
          int id = ingress.getAndIncrement();
          Observable<R> o = mapper.call(v)
              .takeUntil(cancel.filter(e -> e == id + n));
          cancel.onNext(id);
          return o;
        });
      })
    ;
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_537() throws Exception {
  Observable.range(1, 5).takeUntil(x -> x == 3);  // [1, 2, 3]
  Observable.range(1, 5).takeWhile(x -> x != 3);  // [1, 2]
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void singleTakeUntil() {
    PublishSubject<String> controller = PublishSubject.create();

    TestSubscriber<String> testSubscriber = new TestSubscriber<>(0);

    Single.just("Hello")
      .toObservable()
      .takeUntil(controller.map(v -> { throw new CancellationException(); }))
      .subscribe(testSubscriber);

    controller.onNext("Stop flow");

    testSubscriber.requestMore(1);
    testSubscriber.assertNoValues();
    testSubscriber.assertError(CancellationException.class);
  }
}

相关文章

Observable类方法