[英]Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the source Observable.
Scheduler: startWith does not operate by default on a particular Scheduler.
代码示例来源:origin: yahoo/squidb
private <T> Observable<T> observeAndEmit(final T objectToEmit, Func1<Set<SqlTable<?>>, Boolean> tableFilter,
boolean emitOnFirstSubscribe) {
Observable<Set<SqlTable<?>>> observable = changedTablePublisher.filter(tableFilter);
if (emitOnFirstSubscribe) {
observable = observable.startWith(INITIAL_TABLE);
return observable.map(new Func1<Set<SqlTable<?>>, T>() {
public T call(Set<SqlTable<?>> sqlTables) {
return objectToEmit;
代码示例来源:origin: PipelineAI/pipeline
public Observable<Bucket> call() {
return inputEventStream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
代码示例来源:origin: PipelineAI/pipeline
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
rollingMaxStream = inputEventStream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.window(numBuckets, 1)
代码示例来源:origin: PipelineAI/pipeline
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
rollingDistributionStream = stream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
代码示例来源:origin: konmik/nucleus
.startWith(Observable.range(0, requestedPageCount))
.concatMap(new Func1<Integer, Observable<String>>() {
代码示例来源:origin: jhusain/learnrxjava
public static void main(String args[]) {
// buffer every 500ms (using 999999999 to mark start of output)
hotStream().window(500, TimeUnit.MILLISECONDS).take(10).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
// buffer 10 items at a time (using 999999999 to mark start of output)
hotStream().window(10).take(2).flatMap(w -> w.startWith(999999999)).toBlocking().forEach(System.out::println);
代码示例来源:origin: ribot/ribot-app-android
* Retrieve list of venues. Behaviour:
* 1. Return cached venues (empty list if none is cached)
* 2. Return API venues (if different to cached ones)
* 3. Save new venues from API in cache
* 5. If an error happens and cache is not empty, returns venues from cache.
public Observable<List<Venue>> getVenues() {
String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
return mRibotService.getVenues(auth)
.doOnNext(new Action1<List<Venue>>() {
public void call(List<Venue> venues) {
.onErrorResumeNext(new Func1<Throwable, Observable<? extends List<Venue>>>() {
public Observable<? extends List<Venue>> call(Throwable throwable) {
return getVenuesRecoveryObservable(throwable);
代码示例来源:origin: henrymorgen/android-advanced-light
private void startWith() {
Observable.just(3, 4, 5).startWith(1, 2)
.subscribe(new Action1<Integer>() {
public void call(Integer integer) {
Log.d(TAG, "startWith:"+integer);
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
/** An observable that start with the source value and notify source value changes. */
public static <T, V extends HasValueChangeHandlers<T>> Observable<T> bindValueChange(V source, Function<V, T> get) {
return RxHandlers.valueChange(source).map(ValueChangeEvent::getValue)
.startWith(defer(() -> just(get.apply(source))));
代码示例来源:origin: com.netflix.hystrix/hystrix-core
public Observable<Bucket> call() {
return inputEventStream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
代码示例来源:origin: Laimiux/rxnetwork-android
* Creates an observable that listens to connectivity changes
public static Observable<Boolean> stream(Context context) {
final Context applicationContext = context.getApplicationContext();
final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
return ContentObservable.fromBroadcast(context, action)
// To get initial connectivity status
.startWith((Intent) null)
.map(new Func1<Intent, Boolean>() {
@Override public Boolean call(Intent ignored) {
return getConnectivityStatus(applicationContext);
代码示例来源:origin: PhilippeBoisney/SimpleDroidRx
public void streamLongTask(){
subscription = getObservableOnNormalTask()
代码示例来源:origin: com.netflix.hystrix/hystrix-core
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
rollingMaxStream = inputEventStream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.window(numBuckets, 1)
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
public static <T> Observable<T> bindSingleSelectionChange(SingleSelectionModel<T> source) {
return RxHandlers.selectionChange(source).map(e -> source.getSelectedObject())
.startWith(defer(() -> just(source.getSelectedObject())));
代码示例来源:origin: com.intendia.gwt.rxgwt/rxgwt
public static <T> Observable<Set<T>> bindSetSelectionChange(SetSelectionModel<T> source) {
return RxHandlers.selectionChange(source).map(e -> source.getSelectedSet())
.startWith(defer(() -> just(source.getSelectedSet())));
代码示例来源:origin: nurkiewicz/rxjava-book-examples
public void sample_367() throws Exception {
.just(1, 2)
代码示例来源:origin: nurkiewicz/rxjava-book-examples
Observable<String> speak(String quote, long millisPerChar) {
String[] tokens = quote.replaceAll("[:,]", "").split(" ");
Observable<String> words = Observable.from(tokens);
Observable<Long> absoluteDelay = words
.map(len -> len * millisPerChar)
.scan((total, current) -> total + current);
return words
.zipWith(absoluteDelay.startWith(0L), Pair::of)
.flatMap(pair -> just(pair.getLeft())
.delay(pair.getRight(), MILLISECONDS));
代码示例来源:origin: io.requery/requery
static <T> Observable<RxResult<T>> toResultObservable(final RxResult<T> result) {
final QueryElement<?> element = result.unwrapQuery();
// ensure the transaction listener is added in the target data store
return typeChanges.commitSubject()
.filter(new Func1<Set<Type<?>>, Boolean>() {
public Boolean call(Set<Type<?>> types) {
return !Collections.disjoint(element.entityTypes(), types) ||
Types.referencesType(element.entityTypes(), types);
}).map(new Func1<Set<Type<?>>, RxResult<T>>() {
public RxResult<T> call(Set<Type<?>> types) {
return result;
代码示例来源:origin: com.netflix.falcor/falcor-router
* Completes the request with the given reference and paths. Continues handling of the reference path followed by
* the unmatched path via alternative route.
public Observable<RouteResult> ref(FalcorPath matched, PathTree unmatched, FalcorPath path, Route<R> route) {
return route.call(withPaths(path, unmatched)).startWith(Complete(matched, PathTree.empty(), new PathValue(matched, new Ref(path))));
代码示例来源:origin: nurkiewicz/rxjava-book-examples
public void sample_355() throws Exception {
Observable<String> fast = interval(10, MILLISECONDS)
.map(x -> "F" + x)
.delay(100, MILLISECONDS)
Observable<String> slow = interval(17, MILLISECONDS).map(x -> "S" + x);
.withLatestFrom(fast, (s, f) -> s + ":" + f)