rx.Observable.startWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.1k)|赞(0)|评价(0)|浏览(175)

本文整理了Java中rx.Observable.startWith()方法的一些代码示例,展示了Observable.startWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.startWith()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:startWith

Observable.startWith介绍

[英]Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the source Observable.

Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable在开始发射源Observable发出的项之前,发射指定Iterable中的项。
调度程序:startWith默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: yahoo/squidb

private <T> Observable<T> observeAndEmit(final T objectToEmit, Func1<Set<SqlTable<?>>, Boolean> tableFilter,
      boolean emitOnFirstSubscribe) {
    Observable<Set<SqlTable<?>>> observable = changedTablePublisher.filter(tableFilter);
    if (emitOnFirstSubscribe) {
      observable = observable.startWith(INITIAL_TABLE);
    }
    return observable.map(new Func1<Set<SqlTable<?>>, T>() {
      @Override
      public T call(Set<SqlTable<?>> sqlTables) {
        return objectToEmit;
      }
    });

  }
}

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Bucket> call() {
    return inputEventStream
        .observe()
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  }
});

代码示例来源:origin: PipelineAI/pipeline

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: konmik/nucleus

.startWith(Observable.range(0, requestedPageCount))
.concatMap(new Func1<Integer, Observable<String>>() {
  @Override

代码示例来源:origin: jhusain/learnrxjava

public static void main(String args[]) {
  // buffer every 500ms (using 999999999 to mark start of output)
  hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  // buffer 10 items at a time (using 999999999 to mark start of output)
  hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
  System.out.println("Done");
}

代码示例来源:origin: ribot/ribot-app-android

/**
 * Retrieve list of venues. Behaviour:
 * 1. Return cached venues (empty list if none is cached)
 * 2. Return API venues (if different to cached ones)
 * 3. Save new venues from API in cache
 * 5. If an error happens and cache is not empty, returns venues from cache.
 */
public Observable<List<Venue>> getVenues() {
  String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
  return mRibotService.getVenues(auth)
      .doOnNext(new Action1<List<Venue>>() {
        @Override
        public void call(List<Venue> venues) {
          mPreferencesHelper.putVenues(venues);
        }
      })
      .onErrorResumeNext(new Func1<Throwable, Observable<? extends List<Venue>>>() {
        @Override
        public Observable<? extends List<Venue>> call(Throwable throwable) {
          return getVenuesRecoveryObservable(throwable);
        }
      })
      .startWith(mPreferencesHelper.getVenuesAsObservable())
      .distinct();
}

代码示例来源:origin: henrymorgen/android-advanced-light

private void startWith() {
    Observable.just(3, 4, 5).startWith(1, 2)
        .subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer integer) {
            Log.d(TAG, "startWith:"+integer);
          }
        });
  }
}

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

/** An observable that start with the source value and notify source value changes. */
public static <T, V extends HasValueChangeHandlers<T>> Observable<T> bindValueChange(V source, Function<V, T> get) {
  return RxHandlers.valueChange(source).map(ValueChangeEvent::getValue)
      .startWith(defer(() -> just(get.apply(source))));
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

@Override
  public Observable<Bucket> call() {
    return inputEventStream
        .observe()
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
  }
});

代码示例来源:origin: Laimiux/rxnetwork-android

/**
  * Creates an observable that listens to connectivity changes
  */
 public static Observable<Boolean> stream(Context context) {
  final Context applicationContext = context.getApplicationContext();
  final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
  return ContentObservable.fromBroadcast(context, action)
    // To get initial connectivity status
    .startWith((Intent) null)
    .map(new Func1<Intent, Boolean>() {
     @Override public Boolean call(Intent ignored) {
      return getConnectivityStatus(applicationContext);
     }
    }).distinctUntilChanged();
 }
}

代码示例来源:origin: PhilippeBoisney/SimpleDroidRx

public void streamLongTask(){
  this.startRunningLongTask();
  subscription = getObservableOnNormalTask()
      .startWith(getObservableOnLongTask(true))
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(getObserverLongTask());
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

public static <T> Observable<T> bindSingleSelectionChange(SingleSelectionModel<T> source) {
  return RxHandlers.selectionChange(source).map(e -> source.getSelectedObject())
      .startWith(defer(() -> just(source.getSelectedObject())));
}

代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt

public static <T> Observable<Set<T>> bindSetSelectionChange(SetSelectionModel<T> source) {
  return RxHandlers.selectionChange(source).map(e -> source.getSelectedSet())
      .startWith(defer(() -> just(source.getSelectedSet())));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_367() throws Exception {
  Observable
      .just(1, 2)
      .startWith(0)
      .subscribe(System.out::println);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

Observable<String> speak(String quote, long millisPerChar) {
  String[] tokens = quote.replaceAll("[:,]", "").split(" ");
  Observable<String> words = Observable.from(tokens);
  Observable<Long> absoluteDelay = words
      .map(String::length)
      .map(len -> len * millisPerChar)
      .scan((total, current) -> total + current);
  return words
      .zipWith(absoluteDelay.startWith(0L), Pair::of)
      .flatMap(pair -> just(pair.getLeft())
          .delay(pair.getRight(), MILLISECONDS));
}

代码示例来源:origin: io.requery/requery

static <T> Observable<RxResult<T>> toResultObservable(final RxResult<T> result) {
    final QueryElement<?> element = result.unwrapQuery();
    // ensure the transaction listener is added in the target data store
    result.addTransactionListener(typeChanges);
    return typeChanges.commitSubject()
      .filter(new Func1<Set<Type<?>>, Boolean>() {
        @Override
        public Boolean call(Set<Type<?>> types) {
          return !Collections.disjoint(element.entityTypes(), types) ||
            Types.referencesType(element.entityTypes(), types);
        }
      }).map(new Func1<Set<Type<?>>, RxResult<T>>() {
        @Override
        public RxResult<T> call(Set<Type<?>> types) {
          return result;
        }
      }).startWith(result);
  }
}

代码示例来源:origin: com.netflix.falcor/falcor-router

/**
 * Completes the request with the given reference and paths. Continues handling of the reference path followed by
 * the unmatched path via alternative route.
 */
public Observable<RouteResult> ref(FalcorPath matched, PathTree unmatched, FalcorPath path, Route<R> route) {
  return route.call(withPaths(path, unmatched)).startWith(Complete(matched, PathTree.empty(), new PathValue(matched, new Ref(path))));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_355() throws Exception {
  Observable<String> fast = interval(10, MILLISECONDS)
      .map(x -> "F" + x)
      .delay(100, MILLISECONDS)
      .startWith("FX");
  Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
  slow
      .withLatestFrom(fast, (s, f) -> s + ":" + f)
      .forEach(System.out::println);
}

相关文章

Observable类方法