rx.Observable.takeUntil()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(257)

本文整理了Java中rx.Observable.takeUntil()方法的一些代码示例,展示了Observable.takeUntil()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeUntil()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:takeUntil

Observable.takeUntil介绍

[英]Returns an Observable that emits the items emitted by the source Observable until a second Observable emits an item.

Scheduler: takeUntil does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的项,直到第二个Observable发出一个项为止。
调度程序:默认情况下,TakeTill不会在特定调度程序上运行。

代码示例

代码示例来源:origin: BaronZ88/MinimalistWeather

  1. .filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId()))
  2. .distinct(weather -> weather.getWeatherLive().getTime())
  3. .takeUntil(weather -> !refreshNow && System.currentTimeMillis() - weather.getWeatherLive().getTime() <= 15 * 60 * 1000);

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).takeUntil(new Func1<Integer, Boolean>() {
  4. @Override
  5. public Boolean call(Integer i) {
  6. return i > 3;
  7. }
  8. }).subscribe(new Action1<Integer>() {
  9. @Override
  10. public void call(Integer integer) {
  11. log(integer);
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: meltwater/rxrabbit

  1. @Override
  2. public Observable<Message> call(Observable<Message> input) {
  3. final AtomicLong consumedCount = new AtomicLong(0);
  4. return input
  5. .doOnNext(message -> message.acknowledger.ack())
  6. .timeout(timeout, timeUnit, just(STOP))
  7. .takeUntil(message -> message == STOP || consumedEnough(consumedCount.incrementAndGet()) )
  8. .filter(message -> message != STOP);
  9. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Observable<Download> getDownload(String md5) {
  2. return downloadsRepository.getDownload(md5)
  3. .flatMap(download -> {
  4. if (download == null || isFileMissingFromCompletedDownload(download)) {
  5. return Observable.error(new DownloadNotFoundException());
  6. } else {
  7. return Observable.just(download);
  8. }
  9. })
  10. .takeUntil(
  11. storedDownload -> storedDownload.getOverallDownloadStatus() == Download.COMPLETED);
  12. }

代码示例来源:origin: codeabovelab/haven-platform

  1. public void subscribeToChanges(Action1<ResultCode> action, int checkIntervalInSec) {
  2. monitor.subscribeOn(Schedulers.newThread()).subscribe(action);
  3. Observable.interval(checkIntervalInSec, TimeUnit.SECONDS).takeUntil(t -> monitor.hasCompleted()).subscribe(observer);
  4. }

代码示例来源:origin: org.zalando.paradox/paradox-nakadi-consumer-core

  1. private <T> Observable.Transformer<T, Long> zipWithFlatMap(final String reason) {
  2. return
  3. observable ->
  4. observable.zipWith(
  5. Observable.range(1, Integer.MAX_VALUE), (t, repeatAttempt) -> {
  6. // Void or Throwable
  7. if (t instanceof Throwable) {
  8. log.warn("Exception [{}]", getMessage((Throwable)t));
  9. }
  10. return repeatAttempt;
  11. }).flatMap(repeatAttempt -> {
  12. final long retryAfterMillis = httpReactiveHandler.getRetryAfterMillis();
  13. checkArgument(retryAfterMillis > 0, "RetryAfterMillis must be greater than 0");
  14. log.debug("Restart after [{}] ms running [{}] reason [{}] attempt : [{}]", retryAfterMillis,
  15. running.get(), reason, repeatAttempt);
  16. return Observable.timer(retryAfterMillis, TimeUnit.MILLISECONDS);
  17. }).takeUntil((stopPredicate) -> !running.get());
  18. }
  19. //J+

代码示例来源:origin: hawkular/hawkular-metrics

  1. public void run() {
  2. logger.info("Checking for expired temp tables");
  3. Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
  4. .takeUntil(i -> finished)
  5. .flatMap(i -> session.execute(findTables.bind()))
  6. .compose(applyRetryPolicy())
  7. .flatMap(Observable::from)
  8. .filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
  9. .map(row -> row.getString(0))
  10. .filter(this::isTableExpired)
  11. .flatMap(this::dropTable)
  12. .subscribe(
  13. table -> logger.infof("Dropped table %s", table),
  14. t -> logger.warn("Cleaning temp tables failed", t),
  15. () -> logger.infof("Finished cleaning expired temp tables")
  16. );
  17. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. public void run() {
  2. logger.info("Checking for expired temp tables");
  3. Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
  4. .takeUntil(i -> finished)
  5. .flatMap(i -> session.execute(findTables.bind()))
  6. .compose(applyRetryPolicy())
  7. .flatMap(Observable::from)
  8. .filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
  9. .map(row -> row.getString(0))
  10. .filter(this::isTableExpired)
  11. .flatMap(this::dropTable)
  12. .subscribe(
  13. table -> logger.infof("Dropped table %s", table),
  14. t -> logger.warn("Cleaning temp tables failed", t),
  15. () -> logger.infof("Finished cleaning expired temp tables")
  16. );
  17. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. private void doOnTick(Action0 action) {
  2. Action0 wrapper = () -> {
  3. Date timeSlice = getTimeSlice(new DateTime(tickScheduler.now()), minutes(1).toStandardDuration()).toDate();
  4. action.call();
  5. };
  6. AtomicReference<DateTime> previousTimeSliceRef = new AtomicReference<>();
  7. // TODO Emit ticks at the start of every minute
  8. Observable.interval(0, 1, TimeUnit.MINUTES, tickScheduler)
  9. .filter(tick -> {
  10. DateTime time = currentMinute();
  11. if (previousTimeSliceRef.get() == null) {
  12. previousTimeSliceRef.set(time);
  13. return true;
  14. }
  15. if (previousTimeSliceRef.get().equals(time)) {
  16. return false;
  17. }
  18. previousTimeSliceRef.set(time);
  19. return true;
  20. })
  21. .takeUntil(d -> !running)
  22. .subscribe(tick -> wrapper.call(), t -> logger.warn(t));
  23. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. /**
  2. * Observe changes to a download. This observable never completes it will emmit items whenever
  3. * the download state changes.
  4. *
  5. * @return observable for download state changes.
  6. */
  7. public Observable<Download> getDownload(String md5) {
  8. return downloadAccessor.get(md5)
  9. .flatMap(download -> {
  10. if (download == null || (download.getOverallDownloadStatus() == Download.COMPLETED
  11. && getStateIfFileExists(download) == Download.FILE_MISSING)) {
  12. return Observable.error(new DownloadNotFoundException());
  13. } else {
  14. return Observable.just(download);
  15. }
  16. })
  17. .takeUntil(
  18. storedDownload -> storedDownload.getOverallDownloadStatus() == Download.COMPLETED);
  19. }

代码示例来源:origin: com.trunk.rx.json/rxjava-json-core

  1. @Override
  2. public Observable<JsonPathEvent> call(Observable<JsonTokenEvent> upstream) {
  3. ConcurrentHashMap<JsonPath, Integer> visitedMatchers = new ConcurrentHashMap<>();
  4. ConcurrentHashMap<JsonPath, Integer> completedMatchers = new ConcurrentHashMap<>();
  5. matchers.forEach(jsonPath -> {
  6. visitedMatchers.put(jsonPath, 0);
  7. completedMatchers.put(jsonPath, 0);
  8. });
  9. return upstream
  10. .takeUntil(ignore -> !lenient && allMatchersComplete(visitedMatchers, completedMatchers))
  11. .concatMap( // order is important
  12. jsonTokenEvent ->
  13. matches(jsonTokenEvent, visitedMatchers, completedMatchers)
  14. .concatWith(
  15. jsonTokenEvent.getToken() == JsonDocumentEnd.instance() ?
  16. Observable.just(new JsonPathEvent(NoopToken.instance(), jsonTokenEvent)) :
  17. Observable.empty()
  18. )
  19. );
  20. }

代码示例来源:origin: techery/janet

  1. private <A> Observable<ActionState<A>> send(final A action) {
  2. return pipeline.asObservable()
  3. .filter(new Func1<ActionPair, Boolean>() {
  4. @Override public Boolean call(ActionPair pair) {
  5. return pair.holder.isOrigin(action);
  6. }
  7. })
  8. .map(new Func1<ActionPair, ActionState>() {
  9. @Override public ActionState call(ActionPair pair) {
  10. return pair.state;
  11. }
  12. })
  13. .compose(new CastToState<A>())
  14. .mergeWith(Observable.<ActionState<A>>empty()
  15. .doOnSubscribe(new Action0() {
  16. @Override public void call() {
  17. doSend(action);
  18. }
  19. }))
  20. .takeUntil(new Func1<ActionState, Boolean>() {
  21. @Override public Boolean call(ActionState actionState) {
  22. return actionState.status == ActionState.Status.SUCCESS
  23. || actionState.status == ActionState.Status.FAIL;
  24. }
  25. });
  26. }

代码示例来源:origin: org.jboss.hal/hal-dmr

  1. /**
  2. * Executes the composite operation until the operation successfully returns and the precondition is met.
  3. * The precondition receives the composite result of the operation.
  4. */
  5. @SuppressWarnings("HardCodedStringLiteral")
  6. public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
  7. @Nullable Predicate<CompositeResult> until) {
  8. logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
  9. Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
  10. (op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
  11. (op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
  12. if (until == null) {
  13. until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
  14. }
  15. return Observable
  16. .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
  17. .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
  18. .flatMapSingle(n -> execution, false, 1)
  19. .takeUntil(until::test) // until succeeded
  20. .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
  21. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. private Observable<Download> handleDownloadProgress(AppDownloader appDownloader) {
  2. return appDownloader.observeDownloadProgress()
  3. .flatMap(appDownloadStatus -> downloadsRepository.getDownload(appDownloadStatus.getMd5())
  4. .first()
  5. .flatMap(download -> updateDownload(download, appDownloadStatus)))
  6. .filter(download -> download.getOverallDownloadStatus() == Download.COMPLETED)
  7. .doOnNext(download -> removeAppDownloader(download.getMd5()))
  8. .doOnNext(download -> downloadAnalytics.onDownloadComplete(download.getMd5(),
  9. download.getPackageName(), download.getVersionCode()))
  10. .takeUntil(download -> download.getOverallDownloadStatus() == Download.COMPLETED);
  11. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
  2. return f ->
  3. f.publish(g ->
  4. g.take(1)
  5. .concatWith(
  6. g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
  7. .take(1)
  8. .ignoreElements()
  9. )
  10. .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
  11. )
  12. ;
  13. }
  14. }

代码示例来源:origin: org.jboss.hal/hal-dmr

  1. /**
  2. * Executes the operation until the operation successfully returns and the precondition is met. The precondition
  3. * receives the result of the operation.
  4. */
  5. @SuppressWarnings("HardCodedStringLiteral")
  6. public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
  7. @Nullable Predicate<ModelNode> until) {
  8. logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
  9. Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
  10. (op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
  11. (op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
  12. if (until == null) {
  13. until = r -> !r.isFailure(); // default: until success
  14. }
  15. return Observable
  16. .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
  17. .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
  18. .flatMapSingle(n -> execution, false, 1)
  19. .takeUntil(until::test) // until succeeded
  20. .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
  21. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. private Observable<FileDownloadCallback> handleFileDownloadProgress(
  2. FileDownloader fileDownloader) {
  3. return fileDownloader.observeFileDownloadProgress()
  4. .takeUntil(fileDownloadCallback -> fileDownloadCallback.getDownloadState()
  5. == AppDownloadStatus.AppDownloadState.ERROR_FILE_NOT_FOUND)
  6. .flatMap(fileDownloadCallback -> {
  7. if (fileDownloadCallback.getDownloadState()
  8. == AppDownloadStatus.AppDownloadState.ERROR_FILE_NOT_FOUND) {
  9. Logger.getInstance()
  10. .d(TAG, "File not found error, restarting the download with the alternative link");
  11. FileDownloader retryFileDownloader =
  12. fileDownloaderProvider.createFileDownloader(md5, alternativeDownloadPath, fileType,
  13. packageName, versionCode, fileName, PublishSubject.create());
  14. this.fileDownloader = retryFileDownloader;
  15. return retryFileDownloader.startFileDownload()
  16. .andThen(handleFileDownloadProgress(retryFileDownloader));
  17. } else {
  18. return Observable.just(fileDownloadCallback);
  19. }
  20. })
  21. .doOnNext(fileDownloadCallback -> retryFileDownloadSubject.onNext(fileDownloadCallback));
  22. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. public static <T, R> Observable.Transformer<T, R> switchFlatMap(
  2. int n, Func1<T, Observable<R>> mapper) {
  3. return f ->
  4. Observable.defer(() -> {
  5. final AtomicInteger ingress = new AtomicInteger();
  6. final Subject<Integer, Integer> cancel =
  7. PublishSubject.<Integer>create().toSerialized();
  8. return f.flatMap(v -> {
  9. int id = ingress.getAndIncrement();
  10. Observable<R> o = mapper.call(v)
  11. .takeUntil(cancel.filter(e -> e == id + n));
  12. cancel.onNext(id);
  13. return o;
  14. });
  15. })
  16. ;
  17. }
  18. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_537() throws Exception {
  3. Observable.range(1, 5).takeUntil(x -> x == 3); // [1, 2, 3]
  4. Observable.range(1, 5).takeWhile(x -> x != 3); // [1, 2]
  5. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void singleTakeUntil() {
  3. PublishSubject<String> controller = PublishSubject.create();
  4. TestSubscriber<String> testSubscriber = new TestSubscriber<>(0);
  5. Single.just("Hello")
  6. .toObservable()
  7. .takeUntil(controller.map(v -> { throw new CancellationException(); }))
  8. .subscribe(testSubscriber);
  9. controller.onNext("Stop flow");
  10. testSubscriber.requestMore(1);
  11. testSubscriber.assertNoValues();
  12. testSubscriber.assertError(CancellationException.class);
  13. }
  14. }

相关文章

Observable类方法