rx.Observable.startWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(197)

本文整理了Java中rx.Observable.startWith()方法的一些代码示例,展示了Observable.startWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.startWith()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:startWith

Observable.startWith介绍

[英]Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the source Observable.

Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable在开始发射源Observable发出的项之前,发射指定Iterable中的项。
调度程序:startWith默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: yahoo/squidb

  1. private <T> Observable<T> observeAndEmit(final T objectToEmit, Func1<Set<SqlTable<?>>, Boolean> tableFilter,
  2. boolean emitOnFirstSubscribe) {
  3. Observable<Set<SqlTable<?>>> observable = changedTablePublisher.filter(tableFilter);
  4. if (emitOnFirstSubscribe) {
  5. observable = observable.startWith(INITIAL_TABLE);
  6. }
  7. return observable.map(new Func1<Set<SqlTable<?>>, T>() {
  8. @Override
  9. public T call(Set<SqlTable<?>> sqlTables) {
  10. return objectToEmit;
  11. }
  12. });
  13. }
  14. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Bucket> call() {
  3. return inputEventStream
  4. .observe()
  5. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
  6. .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
  7. .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  8. }
  9. });

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: konmik/nucleus

  1. .startWith(Observable.range(0, requestedPageCount))
  2. .concatMap(new Func1<Integer, Observable<String>>() {
  3. @Override

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String args[]) {
  2. // buffer every 500ms (using 999999999 to mark start of output)
  3. hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  4. // buffer 10 items at a time (using 999999999 to mark start of output)
  5. hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  6. System.out.println("Done");
  7. }

代码示例来源:origin: ribot/ribot-app-android

  1. /**
  2. * Retrieve list of venues. Behaviour:
  3. * 1. Return cached venues (empty list if none is cached)
  4. * 2. Return API venues (if different to cached ones)
  5. * 3. Save new venues from API in cache
  6. * 5. If an error happens and cache is not empty, returns venues from cache.
  7. */
  8. public Observable<List<Venue>> getVenues() {
  9. String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
  10. return mRibotService.getVenues(auth)
  11. .doOnNext(new Action1<List<Venue>>() {
  12. @Override
  13. public void call(List<Venue> venues) {
  14. mPreferencesHelper.putVenues(venues);
  15. }
  16. })
  17. .onErrorResumeNext(new Func1<Throwable, Observable<? extends List<Venue>>>() {
  18. @Override
  19. public Observable<? extends List<Venue>> call(Throwable throwable) {
  20. return getVenuesRecoveryObservable(throwable);
  21. }
  22. })
  23. .startWith(mPreferencesHelper.getVenuesAsObservable())
  24. .distinct();
  25. }

代码示例来源:origin: henrymorgen/android-advanced-light

  1. private void startWith() {
  2. Observable.just(3, 4, 5).startWith(1, 2)
  3. .subscribe(new Action1<Integer>() {
  4. @Override
  5. public void call(Integer integer) {
  6. Log.d(TAG, "startWith:"+integer);
  7. }
  8. });
  9. }
  10. }

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

  1. /** An observable that start with the source value and notify source value changes. */
  2. public static <T, V extends HasValueChangeHandlers<T>> Observable<T> bindValueChange(V source, Function<V, T> get) {
  3. return RxHandlers.valueChange(source).map(ValueChangeEvent::getValue)
  4. .startWith(defer(() -> just(get.apply(source))));
  5. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. @Override
  2. public Observable<Bucket> call() {
  3. return inputEventStream
  4. .observe()
  5. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
  6. .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
  7. .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  8. }
  9. });

代码示例来源:origin: Laimiux/rxnetwork-android

  1. /**
  2. * Creates an observable that listens to connectivity changes
  3. */
  4. public static Observable<Boolean> stream(Context context) {
  5. final Context applicationContext = context.getApplicationContext();
  6. final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
  7. return ContentObservable.fromBroadcast(context, action)
  8. // To get initial connectivity status
  9. .startWith((Intent) null)
  10. .map(new Func1<Intent, Boolean>() {
  11. @Override public Boolean call(Intent ignored) {
  12. return getConnectivityStatus(applicationContext);
  13. }
  14. }).distinctUntilChanged();
  15. }
  16. }

代码示例来源:origin: PhilippeBoisney/SimpleDroidRx

  1. public void streamLongTask(){
  2. this.startRunningLongTask();
  3. subscription = getObservableOnNormalTask()
  4. .startWith(getObservableOnLongTask(true))
  5. .subscribeOn(Schedulers.io())
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .subscribe(getObserverLongTask());
  8. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

  1. public static <T> Observable<T> bindSingleSelectionChange(SingleSelectionModel<T> source) {
  2. return RxHandlers.selectionChange(source).map(e -> source.getSelectedObject())
  3. .startWith(defer(() -> just(source.getSelectedObject())));
  4. }

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

  1. public static <T> Observable<Set<T>> bindSetSelectionChange(SetSelectionModel<T> source) {
  2. return RxHandlers.selectionChange(source).map(e -> source.getSelectedSet())
  3. .startWith(defer(() -> just(source.getSelectedSet())));
  4. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_367() throws Exception {
  3. Observable
  4. .just(1, 2)
  5. .startWith(0)
  6. .subscribe(System.out::println);
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. Observable<String> speak(String quote, long millisPerChar) {
  2. String[] tokens = quote.replaceAll("[:,]", "").split(" ");
  3. Observable<String> words = Observable.from(tokens);
  4. Observable<Long> absoluteDelay = words
  5. .map(String::length)
  6. .map(len -> len * millisPerChar)
  7. .scan((total, current) -> total + current);
  8. return words
  9. .zipWith(absoluteDelay.startWith(0L), Pair::of)
  10. .flatMap(pair -> just(pair.getLeft())
  11. .delay(pair.getRight(), MILLISECONDS));
  12. }

代码示例来源:origin: io.requery/requery

  1. static <T> Observable<RxResult<T>> toResultObservable(final RxResult<T> result) {
  2. final QueryElement<?> element = result.unwrapQuery();
  3. // ensure the transaction listener is added in the target data store
  4. result.addTransactionListener(typeChanges);
  5. return typeChanges.commitSubject()
  6. .filter(new Func1<Set<Type<?>>, Boolean>() {
  7. @Override
  8. public Boolean call(Set<Type<?>> types) {
  9. return !Collections.disjoint(element.entityTypes(), types) ||
  10. Types.referencesType(element.entityTypes(), types);
  11. }
  12. }).map(new Func1<Set<Type<?>>, RxResult<T>>() {
  13. @Override
  14. public RxResult<T> call(Set<Type<?>> types) {
  15. return result;
  16. }
  17. }).startWith(result);
  18. }
  19. }

代码示例来源:origin: com.netflix.falcor/falcor-router

  1. /**
  2. * Completes the request with the given reference and paths. Continues handling of the reference path followed by
  3. * the unmatched path via alternative route.
  4. */
  5. public Observable<RouteResult> ref(FalcorPath matched, PathTree unmatched, FalcorPath path, Route<R> route) {
  6. return route.call(withPaths(path, unmatched)).startWith(Complete(matched, PathTree.empty(), new PathValue(matched, new Ref(path))));
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_355() throws Exception {
  3. Observable<String> fast = interval(10, MILLISECONDS)
  4. .map(x -> "F" + x)
  5. .delay(100, MILLISECONDS)
  6. .startWith("FX");
  7. Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
  8. slow
  9. .withLatestFrom(fast, (s, f) -> s + ":" + f)
  10. .forEach(System.out::println);
  11. }

相关文章

Observable类方法