本文整理了Java中rx.Observable.takeUntil()
方法的一些代码示例,展示了Observable.takeUntil()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeUntil()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:takeUntil
[英]Returns an Observable that emits the items emitted by the source Observable until a second Observable emits an item.
Scheduler: takeUntil does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的项,直到第二个Observable发出一个项为止。
调度程序:默认情况下,TakeTill不会在特定调度程序上运行。
代码示例来源:origin: BaronZ88/MinimalistWeather
.filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId()))
.distinct(weather -> weather.getWeatherLive().getTime())
.takeUntil(weather -> !refreshNow && System.currentTimeMillis() - weather.getWeatherLive().getTime() <= 15 * 60 * 1000);
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).takeUntil(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i > 3;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: meltwater/rxrabbit
@Override
public Observable<Message> call(Observable<Message> input) {
final AtomicLong consumedCount = new AtomicLong(0);
return input
.doOnNext(message -> message.acknowledger.ack())
.timeout(timeout, timeUnit, just(STOP))
.takeUntil(message -> message == STOP || consumedEnough(consumedCount.incrementAndGet()) )
.filter(message -> message != STOP);
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Observable<Download> getDownload(String md5) {
return downloadsRepository.getDownload(md5)
.flatMap(download -> {
if (download == null || isFileMissingFromCompletedDownload(download)) {
return Observable.error(new DownloadNotFoundException());
} else {
return Observable.just(download);
}
})
.takeUntil(
storedDownload -> storedDownload.getOverallDownloadStatus() == Download.COMPLETED);
}
代码示例来源:origin: codeabovelab/haven-platform
public void subscribeToChanges(Action1<ResultCode> action, int checkIntervalInSec) {
monitor.subscribeOn(Schedulers.newThread()).subscribe(action);
Observable.interval(checkIntervalInSec, TimeUnit.SECONDS).takeUntil(t -> monitor.hasCompleted()).subscribe(observer);
}
代码示例来源:origin: org.zalando.paradox/paradox-nakadi-consumer-core
private <T> Observable.Transformer<T, Long> zipWithFlatMap(final String reason) {
return
observable ->
observable.zipWith(
Observable.range(1, Integer.MAX_VALUE), (t, repeatAttempt) -> {
// Void or Throwable
if (t instanceof Throwable) {
log.warn("Exception [{}]", getMessage((Throwable)t));
}
return repeatAttempt;
}).flatMap(repeatAttempt -> {
final long retryAfterMillis = httpReactiveHandler.getRetryAfterMillis();
checkArgument(retryAfterMillis > 0, "RetryAfterMillis must be greater than 0");
log.debug("Restart after [{}] ms running [{}] reason [{}] attempt : [{}]", retryAfterMillis,
running.get(), reason, repeatAttempt);
return Observable.timer(retryAfterMillis, TimeUnit.MILLISECONDS);
}).takeUntil((stopPredicate) -> !running.get());
}
//J+
代码示例来源:origin: hawkular/hawkular-metrics
public void run() {
logger.info("Checking for expired temp tables");
Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
.takeUntil(i -> finished)
.flatMap(i -> session.execute(findTables.bind()))
.compose(applyRetryPolicy())
.flatMap(Observable::from)
.filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
.map(row -> row.getString(0))
.filter(this::isTableExpired)
.flatMap(this::dropTable)
.subscribe(
table -> logger.infof("Dropped table %s", table),
t -> logger.warn("Cleaning temp tables failed", t),
() -> logger.infof("Finished cleaning expired temp tables")
);
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
public void run() {
logger.info("Checking for expired temp tables");
Observable.interval(1, TimeUnit.DAYS, Schedulers.io())
.takeUntil(i -> finished)
.flatMap(i -> session.execute(findTables.bind()))
.compose(applyRetryPolicy())
.flatMap(Observable::from)
.filter(row -> row.getString(0).startsWith(DataAccessImpl.TEMP_TABLE_NAME_PROTOTYPE))
.map(row -> row.getString(0))
.filter(this::isTableExpired)
.flatMap(this::dropTable)
.subscribe(
table -> logger.infof("Dropped table %s", table),
t -> logger.warn("Cleaning temp tables failed", t),
() -> logger.infof("Finished cleaning expired temp tables")
);
}
代码示例来源:origin: hawkular/hawkular-metrics
private void doOnTick(Action0 action) {
Action0 wrapper = () -> {
Date timeSlice = getTimeSlice(new DateTime(tickScheduler.now()), minutes(1).toStandardDuration()).toDate();
action.call();
};
AtomicReference<DateTime> previousTimeSliceRef = new AtomicReference<>();
// TODO Emit ticks at the start of every minute
Observable.interval(0, 1, TimeUnit.MINUTES, tickScheduler)
.filter(tick -> {
DateTime time = currentMinute();
if (previousTimeSliceRef.get() == null) {
previousTimeSliceRef.set(time);
return true;
}
if (previousTimeSliceRef.get().equals(time)) {
return false;
}
previousTimeSliceRef.set(time);
return true;
})
.takeUntil(d -> !running)
.subscribe(tick -> wrapper.call(), t -> logger.warn(t));
}
代码示例来源:origin: Aptoide/aptoide-client-v8
/**
* Observe changes to a download. This observable never completes it will emmit items whenever
* the download state changes.
*
* @return observable for download state changes.
*/
public Observable<Download> getDownload(String md5) {
return downloadAccessor.get(md5)
.flatMap(download -> {
if (download == null || (download.getOverallDownloadStatus() == Download.COMPLETED
&& getStateIfFileExists(download) == Download.FILE_MISSING)) {
return Observable.error(new DownloadNotFoundException());
} else {
return Observable.just(download);
}
})
.takeUntil(
storedDownload -> storedDownload.getOverallDownloadStatus() == Download.COMPLETED);
}
代码示例来源:origin: com.trunk.rx.json/rxjava-json-core
@Override
public Observable<JsonPathEvent> call(Observable<JsonTokenEvent> upstream) {
ConcurrentHashMap<JsonPath, Integer> visitedMatchers = new ConcurrentHashMap<>();
ConcurrentHashMap<JsonPath, Integer> completedMatchers = new ConcurrentHashMap<>();
matchers.forEach(jsonPath -> {
visitedMatchers.put(jsonPath, 0);
completedMatchers.put(jsonPath, 0);
});
return upstream
.takeUntil(ignore -> !lenient && allMatchersComplete(visitedMatchers, completedMatchers))
.concatMap( // order is important
jsonTokenEvent ->
matches(jsonTokenEvent, visitedMatchers, completedMatchers)
.concatWith(
jsonTokenEvent.getToken() == JsonDocumentEnd.instance() ?
Observable.just(new JsonPathEvent(NoopToken.instance(), jsonTokenEvent)) :
Observable.empty()
)
);
}
代码示例来源:origin: techery/janet
private <A> Observable<ActionState<A>> send(final A action) {
return pipeline.asObservable()
.filter(new Func1<ActionPair, Boolean>() {
@Override public Boolean call(ActionPair pair) {
return pair.holder.isOrigin(action);
}
})
.map(new Func1<ActionPair, ActionState>() {
@Override public ActionState call(ActionPair pair) {
return pair.state;
}
})
.compose(new CastToState<A>())
.mergeWith(Observable.<ActionState<A>>empty()
.doOnSubscribe(new Action0() {
@Override public void call() {
doSend(action);
}
}))
.takeUntil(new Func1<ActionState, Boolean>() {
@Override public Boolean call(ActionState actionState) {
return actionState.status == ActionState.Status.SUCCESS
|| actionState.status == ActionState.Status.FAIL;
}
});
}
代码示例来源:origin: org.jboss.hal/hal-dmr
/**
* Executes the composite operation until the operation successfully returns and the precondition is met.
* The precondition receives the composite result of the operation.
*/
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
@Nullable Predicate<CompositeResult> until) {
logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
(op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
(op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
if (until == null) {
until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
}
return Observable
.interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
.doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
.flatMapSingle(n -> execution, false, 1)
.takeUntil(until::test) // until succeeded
.toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}
代码示例来源:origin: Aptoide/aptoide-client-v8
private Observable<Download> handleDownloadProgress(AppDownloader appDownloader) {
return appDownloader.observeDownloadProgress()
.flatMap(appDownloadStatus -> downloadsRepository.getDownload(appDownloadStatus.getMd5())
.first()
.flatMap(download -> updateDownload(download, appDownloadStatus)))
.filter(download -> download.getOverallDownloadStatus() == Download.COMPLETED)
.doOnNext(download -> removeAppDownloader(download.getMd5()))
.doOnNext(download -> downloadAnalytics.onDownloadComplete(download.getMd5(),
download.getPackageName(), download.getVersionCode()))
.takeUntil(download -> download.getOverallDownloadStatus() == Download.COMPLETED);
}
代码示例来源:origin: akarnokd/akarnokd-misc
static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
return f ->
f.publish(g ->
g.take(1)
.concatWith(
g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
.take(1)
.ignoreElements()
)
.repeatWhen(h -> h.takeUntil(g.ignoreElements()))
)
;
}
}
代码示例来源:origin: org.jboss.hal/hal-dmr
/**
* Executes the operation until the operation successfully returns and the precondition is met. The precondition
* receives the result of the operation.
*/
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
@Nullable Predicate<ModelNode> until) {
logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
(op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
(op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
if (until == null) {
until = r -> !r.isFailure(); // default: until success
}
return Observable
.interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
.doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
.flatMapSingle(n -> execution, false, 1)
.takeUntil(until::test) // until succeeded
.toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}
代码示例来源:origin: Aptoide/aptoide-client-v8
private Observable<FileDownloadCallback> handleFileDownloadProgress(
FileDownloader fileDownloader) {
return fileDownloader.observeFileDownloadProgress()
.takeUntil(fileDownloadCallback -> fileDownloadCallback.getDownloadState()
== AppDownloadStatus.AppDownloadState.ERROR_FILE_NOT_FOUND)
.flatMap(fileDownloadCallback -> {
if (fileDownloadCallback.getDownloadState()
== AppDownloadStatus.AppDownloadState.ERROR_FILE_NOT_FOUND) {
Logger.getInstance()
.d(TAG, "File not found error, restarting the download with the alternative link");
FileDownloader retryFileDownloader =
fileDownloaderProvider.createFileDownloader(md5, alternativeDownloadPath, fileType,
packageName, versionCode, fileName, PublishSubject.create());
this.fileDownloader = retryFileDownloader;
return retryFileDownloader.startFileDownload()
.andThen(handleFileDownloadProgress(retryFileDownloader));
} else {
return Observable.just(fileDownloadCallback);
}
})
.doOnNext(fileDownloadCallback -> retryFileDownloadSubject.onNext(fileDownloadCallback));
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static <T, R> Observable.Transformer<T, R> switchFlatMap(
int n, Func1<T, Observable<R>> mapper) {
return f ->
Observable.defer(() -> {
final AtomicInteger ingress = new AtomicInteger();
final Subject<Integer, Integer> cancel =
PublishSubject.<Integer>create().toSerialized();
return f.flatMap(v -> {
int id = ingress.getAndIncrement();
Observable<R> o = mapper.call(v)
.takeUntil(cancel.filter(e -> e == id + n));
cancel.onNext(id);
return o;
});
})
;
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_537() throws Exception {
Observable.range(1, 5).takeUntil(x -> x == 3); // [1, 2, 3]
Observable.range(1, 5).takeWhile(x -> x != 3); // [1, 2]
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void singleTakeUntil() {
PublishSubject<String> controller = PublishSubject.create();
TestSubscriber<String> testSubscriber = new TestSubscriber<>(0);
Single.just("Hello")
.toObservable()
.takeUntil(controller.map(v -> { throw new CancellationException(); }))
.subscribe(testSubscriber);
controller.onNext("Stop flow");
testSubscriber.requestMore(1);
testSubscriber.assertNoValues();
testSubscriber.assertError(CancellationException.class);
}
}
内容来源于网络,如有侵权,请联系作者删除!