本文整理了Java中rx.Observable.startWith()
方法的一些代码示例,展示了Observable.startWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.startWith()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:startWith
[英]Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the source Observable.
Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable在开始发射源Observable发出的项之前,发射指定Iterable中的项。
调度程序:startWith默认情况下不会在特定调度程序上运行。
代码示例来源:origin: yahoo/squidb
private <T> Observable<T> observeAndEmit(final T objectToEmit, Func1<Set<SqlTable<?>>, Boolean> tableFilter,
boolean emitOnFirstSubscribe) {
Observable<Set<SqlTable<?>>> observable = changedTablePublisher.filter(tableFilter);
if (emitOnFirstSubscribe) {
observable = observable.startWith(INITIAL_TABLE);
}
return observable.map(new Func1<Set<SqlTable<?>>, T>() {
@Override
public T call(Set<SqlTable<?>> sqlTables) {
return objectToEmit;
}
});
}
}
代码示例来源:origin: PipelineAI/pipeline
@Override
public Observable<Bucket> call() {
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
}
});
代码示例来源:origin: PipelineAI/pipeline
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
}
final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
}
};
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: konmik/nucleus
.startWith(Observable.range(0, requestedPageCount))
.concatMap(new Func1<Integer, Observable<String>>() {
@Override
代码示例来源:origin: jhusain/learnrxjava
public static void main(String args[]) {
// buffer every 500ms (using 999999999 to mark start of output)
hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
// buffer 10 items at a time (using 999999999 to mark start of output)
hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
System.out.println("Done");
}
代码示例来源:origin: ribot/ribot-app-android
/**
* Retrieve list of venues. Behaviour:
* 1. Return cached venues (empty list if none is cached)
* 2. Return API venues (if different to cached ones)
* 3. Save new venues from API in cache
* 5. If an error happens and cache is not empty, returns venues from cache.
*/
public Observable<List<Venue>> getVenues() {
String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
return mRibotService.getVenues(auth)
.doOnNext(new Action1<List<Venue>>() {
@Override
public void call(List<Venue> venues) {
mPreferencesHelper.putVenues(venues);
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends List<Venue>>>() {
@Override
public Observable<? extends List<Venue>> call(Throwable throwable) {
return getVenuesRecoveryObservable(throwable);
}
})
.startWith(mPreferencesHelper.getVenuesAsObservable())
.distinct();
}
代码示例来源:origin: henrymorgen/android-advanced-light
private void startWith() {
Observable.just(3, 4, 5).startWith(1, 2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "startWith:"+integer);
}
});
}
}
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
/** An observable that start with the source value and notify source value changes. */
public static <T, V extends HasValueChangeHandlers<T>> Observable<T> bindValueChange(V source, Function<V, T> get) {
return RxHandlers.valueChange(source).map(ValueChangeEvent::getValue)
.startWith(defer(() -> just(get.apply(source))));
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
@Override
public Observable<Bucket> call() {
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
}
});
代码示例来源:origin: Laimiux/rxnetwork-android
/**
* Creates an observable that listens to connectivity changes
*/
public static Observable<Boolean> stream(Context context) {
final Context applicationContext = context.getApplicationContext();
final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
return ContentObservable.fromBroadcast(context, action)
// To get initial connectivity status
.startWith((Intent) null)
.map(new Func1<Intent, Boolean>() {
@Override public Boolean call(Intent ignored) {
return getConnectivityStatus(applicationContext);
}
}).distinctUntilChanged();
}
}
代码示例来源:origin: PhilippeBoisney/SimpleDroidRx
public void streamLongTask(){
this.startRunningLongTask();
subscription = getObservableOnNormalTask()
.startWith(getObservableOnLongTask(true))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserverLongTask());
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
public static <T> Observable<T> bindSingleSelectionChange(SingleSelectionModel<T> source) {
return RxHandlers.selectionChange(source).map(e -> source.getSelectedObject())
.startWith(defer(() -> just(source.getSelectedObject())));
}
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
public static <T> Observable<Set<T>> bindSetSelectionChange(SetSelectionModel<T> source) {
return RxHandlers.selectionChange(source).map(e -> source.getSelectedSet())
.startWith(defer(() -> just(source.getSelectedSet())));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_367() throws Exception {
Observable
.just(1, 2)
.startWith(0)
.subscribe(System.out::println);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
Observable<String> speak(String quote, long millisPerChar) {
String[] tokens = quote.replaceAll("[:,]", "").split(" ");
Observable<String> words = Observable.from(tokens);
Observable<Long> absoluteDelay = words
.map(String::length)
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);
return words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
}
代码示例来源:origin: io.requery/requery
static <T> Observable<RxResult<T>> toResultObservable(final RxResult<T> result) {
final QueryElement<?> element = result.unwrapQuery();
// ensure the transaction listener is added in the target data store
result.addTransactionListener(typeChanges);
return typeChanges.commitSubject()
.filter(new Func1<Set<Type<?>>, Boolean>() {
@Override
public Boolean call(Set<Type<?>> types) {
return !Collections.disjoint(element.entityTypes(), types) ||
Types.referencesType(element.entityTypes(), types);
}
}).map(new Func1<Set<Type<?>>, RxResult<T>>() {
@Override
public RxResult<T> call(Set<Type<?>> types) {
return result;
}
}).startWith(result);
}
}
代码示例来源:origin: com.netflix.falcor/falcor-router
/**
* Completes the request with the given reference and paths. Continues handling of the reference path followed by
* the unmatched path via alternative route.
*/
public Observable<RouteResult> ref(FalcorPath matched, PathTree unmatched, FalcorPath path, Route<R> route) {
return route.call(withPaths(path, unmatched)).startWith(Complete(matched, PathTree.empty(), new PathValue(matched, new Ref(path))));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_355() throws Exception {
Observable<String> fast = interval(10, MILLISECONDS)
.map(x -> "F" + x)
.delay(100, MILLISECONDS)
.startWith("FX");
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
slow
.withLatestFrom(fast, (s, f) -> s + ":" + f)
.forEach(System.out::println);
}
内容来源于网络,如有侵权,请联系作者删除!