本文整理了Java中io.reactivex.Observable.range()
方法的一些代码示例,展示了Observable.range()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.range()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:range
[英]Returns an Observable that emits a sequence of Integers within a specified range.
Scheduler: range does not operate by default on a particular Scheduler.
[中]返回在指定范围内发出整数序列的可观测值。
调度程序:默认情况下,范围不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Integer t) {
return Observable.range(t, 2);
}
}, true)
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(1, 2);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 2);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Object> apply(Integer v) throws Exception {
return Observable.range(1, 2).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
throw new TestException();
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
private Observable<Integer> mergeNAsyncStreamsOfN(final int outerSize, final int innerSize) {
Observable<Observable<Integer>> os = Observable.range(1, outerSize)
.map(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer i) {
return Observable.range(1, innerSize).subscribeOn(Schedulers.computation());
}
});
return Observable.merge(os);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fusedReject() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY | QueueFuseable.BOUNDARY);
Observable.range(1, 5)
.map(Functions.<Integer>identity())
.subscribe(to);
ObserverFusion.assertFusion(to, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
return Observable.range(1, Observable.bufferSize() * 2)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
count.getAndIncrement();
}
}).hide();
}
}).subscribe(to);
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void bufferTimeSkipDefault() {
Observable.range(1, 5).buffer(1, 1, TimeUnit.MINUTES)
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void restartTimer() {
Observable.range(1, 5)
.buffer(1, TimeUnit.DAYS, Schedulers.single(), 2, Functions.<Integer>createArrayList(16), true)
.test()
.assertResult(Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void asyncFusedRejected() {
TestObserver<Integer> to0 = ObserverFusion.newTest(QueueFuseable.ASYNC);
Observable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(to0);
ObserverFusion.assertFusion(to0, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSimple2() {
Observable.range(1, 100).concatMapEager(toRange).subscribe(to);
to.assertNoErrors();
to.assertValueCount(200);
to.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void bufferBoundaryHint() {
Observable.range(1, 5).buffer(Observable.timer(1, TimeUnit.MINUTES), 2)
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalDelayBoundary() {
Observable.range(1, 5)
.concatMapEagerDelayError(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer t) {
return Observable.range(t, 2);
}
}, false)
.test()
.assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void otherError() {
final TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, 5)
.concatWith(Completable.error(new TestException()))
.subscribe(to);
to.assertFailure(TestException.class, 1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
Observable.range(1, 5)
.mergeWith(Single.just(100))
.test()
.assertResult(1, 2, 3, 4, 5, 100);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void simple() {
Observable.range(1, 5)
.concatMapCompletable(Functions.justFunction(Completable.complete()))
.test()
.assertResult();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake() {
TestObserver<Integer> to = new TestObserver<Integer>();
ObservableCache<Integer> cached = new ObservableCache<Integer>(Observable.range(1, 1000), 16);
cached.take(10).subscribe(to);
to.assertNoErrors();
to.assertComplete();
to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// ts.assertUnsubscribed(); // FIXME no longer valid
assertFalse(cached.hasObservers());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() {
Observable<Integer> cache = Observable.range(1, 5).cache();
cache.take(2).test().assertResult(1, 2);
cache.take(3).test().assertResult(1, 2, 3);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void syncFusedCompletable() {
Observable.range(1, 5).hide()
.switchMap(Functions.justFunction(
Completable.complete().toObservable()
))
.test()
.assertResult();
}
内容来源于网络,如有侵权,请联系作者删除!