io.reactivex.Observable.startWith()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(94)

本文整理了Java中io.reactivex.Observable.startWith()方法的一些代码示例,展示了Observable.startWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.startWith()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:startWith

Observable.startWith介绍

[英]Returns an Observable that emits the items in a specified ObservableSource before it begins to emit items emitted by the source ObservableSource.

Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个可观察对象,该对象在开始发射源可观察资源发射的项之前发射指定可观察资源中的项。
调度程序:startWith默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Observable<Integer> w) {
    return w.startWith(indicator);
  }
}).subscribe(to);

代码示例来源:origin: Polidea/RxAndroidBle

public static <T> Observable<T> justOnNext(T onNext) {
  return Observable.<T>never().startWith(onNext);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Observable<Integer> w) {
    return w.startWith(indicator)
        .doOnComplete(new Action() {
          @Override
          public void run() {
            System.out.println("inner done: " + wip.incrementAndGet());
          }
        })
        ;
  }
})

代码示例来源:origin: Polidea/RxAndroidBle

private Observable<RxBleConnection> emitConnectionWithoutCompleting() {
  return Observable.<RxBleConnection>never().startWith(rxBleConnection);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Observable<? extends Throwable> t1) {
    return t1.map(new Function<Throwable, Integer>() {
      @Override
      public Integer apply(Throwable t1) {
        return 0;
      }
    }).startWith(0).cast(Object.class);
  }
}).subscribe(observer);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithSingleNull() {
  just1.startWith((Integer)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithObservableNull() {
  just1.startWith((Observable<Integer>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithIterableNull() {
  just1.startWith((Iterable<Integer>)null);
}

代码示例来源:origin: sockeqwe/mosby

public Observable<String> intent1() {
  return Observable.just("Intent 1").startWith("Before Intent 1");
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithIterableOneNull() {
  just1.startWith(Arrays.asList(1, null)).blockingSubscribe();
}

代码示例来源:origin: square/sqlbrite

@CheckResult @NonNull
private QueryObservable createQuery(DatabaseQuery query) {
 if (transactions.get() != null) {
  throw new IllegalStateException("Cannot create observable query in transaction. "
    + "Use query() for a query inside a transaction.");
 }
 return triggers //
   .filter(query) // DatabaseQuery filters triggers to on tables we care about.
   .map(query) // DatabaseQuery maps to itself to save an allocation.
   .startWith(query) //
   .observeOn(scheduler) //
   .compose(queryTransformer) // Apply the user's query transformer.
   .doOnSubscribe(ensureNotInTransaction)
   .to(QUERY_OBSERVABLE);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void startWithIterableIteratorNull() {
  just1.startWith(new Iterable<Integer>() {
    @Override
    public Iterator<Integer> iterator() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void startWithIterable() {
  List<String> li = new ArrayList<String>();
  li.add("alpha");
  li.add("beta");
  List<String> values = Observable.just("one", "two").startWith(li).toList().blockingGet();
  assertEquals("alpha", values.get(0));
  assertEquals("beta", values.get(1));
  assertEquals("one", values.get(2));
  assertEquals("two", values.get(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Movie> apply(Observable<List<Movie>> movieList) {
    return movieList
      .startWith(new ArrayList<Movie>())
      .buffer(2, 1)
      .skip(1)
      .flatMap(calculateDelta);
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testStartWithWithScheduler() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Integer> o = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);
  Observer<Integer> observer = TestHelper.mockObserver();
  o.subscribe(observer);
  scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onNext(1);
  inOrder.verify(observer, times(1)).onNext(2);
  inOrder.verify(observer, times(1)).onNext(3);
  inOrder.verify(observer, times(1)).onNext(4);
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void startWithObservable() {
  List<String> li = new ArrayList<String>();
  li.add("alpha");
  li.add("beta");
  List<String> values = Observable.just("one", "two")
      .startWith(Observable.fromIterable(li))
      .toList()
      .blockingGet();
  assertEquals("alpha", values.get(0));
  assertEquals("beta", values.get(1));
  assertEquals("one", values.get(2));
  assertEquals("two", values.get(3));
}

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
 final String service = Context.CONNECTIVITY_SERVICE;
 final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
 return Observable.create(new ObservableOnSubscribe<Connectivity>() {
  @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
   networkCallback = createNetworkCallback(subscriber, context);
   final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
   manager.registerNetworkCallback(networkRequest, networkCallback);
  }
 }).doOnDispose(new Action() {
  @Override public void run() {
   tryToUnregisterCallback(manager);
  }
 }).startWith(Connectivity.create(context)).distinctUntilChanged();
}

代码示例来源:origin: requery/requery

static <T> Observable<ReactiveResult<T>> toObservableResult(final ReactiveResult<T> result) {
    final QueryElement<?> element = result.unwrapQuery();
    // ensure the transaction listener is added in the target data store
    result.addTransactionListener(typeChanges);
    return typeChanges.commitSubject()
      .filter(new Predicate<Set<Type<?>>>() {
        @Override
        public boolean test(Set<Type<?>> types) {
          return !Collections.disjoint(element.entityTypes(), types) ||
              Types.referencesType(element.entityTypes(), types);
        }
      }).map(new Function<Set<Type<?>>, ReactiveResult<T>>() {
        @Override
        public ReactiveResult<T> apply(Set<Type<?>> types) {
          return result;
        }
      }).startWith(result);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testErrorInParentObservable() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.mergeDelayError(
      Observable.just(Observable.just(1), Observable.just(2))
          .startWith(Observable.<Integer> error(new RuntimeException()))
      ).subscribe(to);
  to.awaitTerminalEvent();
  to.assertTerminated();
  to.assertValues(1, 2);
  assertEquals(1, to.errorCount());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorDelayed2() {
  Observable.combineLatestDelayError(
      new Function<Object[], Object>() {
        @Override
        public Object apply(Object[] a) throws Exception {
          return a;
        }
      },
      128,
      Observable.error(new TestException()).startWith(1),
      Observable.empty()
  )
  .test()
  .assertFailure(TestException.class);
}

相关文章

Observable类方法