本文整理了Java中io.reactivex.Observable.startWith()
方法的一些代码示例,展示了Observable.startWith()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.startWith()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:startWith
[英]Returns an Observable that emits the items in a specified ObservableSource before it begins to emit items emitted by the source ObservableSource.
Scheduler: startWith does not operate by default on a particular Scheduler.
[中]返回一个可观察对象,该对象在开始发射源可观察资源发射的项之前发射指定可观察资源中的项。
调度程序:startWith默认情况下不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Observable<Integer> w) {
return w.startWith(indicator);
}
}).subscribe(to);
代码示例来源:origin: Polidea/RxAndroidBle
public static <T> Observable<T> justOnNext(T onNext) {
return Observable.<T>never().startWith(onNext);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Observable<Integer> w) {
return w.startWith(indicator)
.doOnComplete(new Action() {
@Override
public void run() {
System.out.println("inner done: " + wip.incrementAndGet());
}
})
;
}
})
代码示例来源:origin: Polidea/RxAndroidBle
private Observable<RxBleConnection> emitConnectionWithoutCompleting() {
return Observable.<RxBleConnection>never().startWith(rxBleConnection);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Object> apply(Observable<? extends Throwable> t1) {
return t1.map(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable t1) {
return 0;
}
}).startWith(0).cast(Object.class);
}
}).subscribe(observer);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void startWithSingleNull() {
just1.startWith((Integer)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void startWithObservableNull() {
just1.startWith((Observable<Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void startWithIterableNull() {
just1.startWith((Iterable<Integer>)null);
}
代码示例来源:origin: sockeqwe/mosby
public Observable<String> intent1() {
return Observable.just("Intent 1").startWith("Before Intent 1");
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void startWithIterableOneNull() {
just1.startWith(Arrays.asList(1, null)).blockingSubscribe();
}
代码示例来源:origin: square/sqlbrite
@CheckResult @NonNull
private QueryObservable createQuery(DatabaseQuery query) {
if (transactions.get() != null) {
throw new IllegalStateException("Cannot create observable query in transaction. "
+ "Use query() for a query inside a transaction.");
}
return triggers //
.filter(query) // DatabaseQuery filters triggers to on tables we care about.
.map(query) // DatabaseQuery maps to itself to save an allocation.
.startWith(query) //
.observeOn(scheduler) //
.compose(queryTransformer) // Apply the user's query transformer.
.doOnSubscribe(ensureNotInTransaction)
.to(QUERY_OBSERVABLE);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void startWithIterableIteratorNull() {
just1.startWith(new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void startWithIterable() {
List<String> li = new ArrayList<String>();
li.add("alpha");
li.add("beta");
List<String> values = Observable.just("one", "two").startWith(li).toList().blockingGet();
assertEquals("alpha", values.get(0));
assertEquals("beta", values.get(1));
assertEquals("one", values.get(2));
assertEquals("two", values.get(3));
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Movie> apply(Observable<List<Movie>> movieList) {
return movieList
.startWith(new ArrayList<Movie>())
.buffer(2, 1)
.skip(1)
.flatMap(calculateDelta);
}
};
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testStartWithWithScheduler() {
TestScheduler scheduler = new TestScheduler();
Observable<Integer> o = Observable.just(3, 4).startWith(Arrays.asList(1, 2)).subscribeOn(scheduler);
Observer<Integer> observer = TestHelper.mockObserver();
o.subscribe(observer);
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onNext(2);
inOrder.verify(observer, times(1)).onNext(3);
inOrder.verify(observer, times(1)).onNext(4);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void startWithObservable() {
List<String> li = new ArrayList<String>();
li.add("alpha");
li.add("beta");
List<String> values = Observable.just("one", "two")
.startWith(Observable.fromIterable(li))
.toList()
.blockingGet();
assertEquals("alpha", values.get(0));
assertEquals("beta", values.get(1));
assertEquals("one", values.get(2));
assertEquals("two", values.get(3));
}
代码示例来源:origin: pwittchen/ReactiveNetwork
@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
final String service = Context.CONNECTIVITY_SERVICE;
final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
return Observable.create(new ObservableOnSubscribe<Connectivity>() {
@Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
networkCallback = createNetworkCallback(subscriber, context);
final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
manager.registerNetworkCallback(networkRequest, networkCallback);
}
}).doOnDispose(new Action() {
@Override public void run() {
tryToUnregisterCallback(manager);
}
}).startWith(Connectivity.create(context)).distinctUntilChanged();
}
代码示例来源:origin: requery/requery
static <T> Observable<ReactiveResult<T>> toObservableResult(final ReactiveResult<T> result) {
final QueryElement<?> element = result.unwrapQuery();
// ensure the transaction listener is added in the target data store
result.addTransactionListener(typeChanges);
return typeChanges.commitSubject()
.filter(new Predicate<Set<Type<?>>>() {
@Override
public boolean test(Set<Type<?>> types) {
return !Collections.disjoint(element.entityTypes(), types) ||
Types.referencesType(element.entityTypes(), types);
}
}).map(new Function<Set<Type<?>>, ReactiveResult<T>>() {
@Override
public ReactiveResult<T> apply(Set<Type<?>> types) {
return result;
}
}).startWith(result);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorInParentObservable() {
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.mergeDelayError(
Observable.just(Observable.just(1), Observable.just(2))
.startWith(Observable.<Integer> error(new RuntimeException()))
).subscribe(to);
to.awaitTerminalEvent();
to.assertTerminated();
to.assertValues(1, 2);
assertEquals(1, to.errorCount());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void errorDelayed2() {
Observable.combineLatestDelayError(
new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return a;
}
},
128,
Observable.error(new TestException()).startWith(1),
Observable.empty()
)
.test()
.assertFailure(TestException.class);
}
内容来源于网络,如有侵权,请联系作者删除!