[英]Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting Publisher emits connected, non-overlapping windows, each containing count items. When the source Publisher completes or encounters an error, the resulting Publisher emits the current window and propagates the notification from the source Publisher.
Backpressure: The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an unbounded buffer that may hold at most count elements. Scheduler: This version of window does not operate by default on a particular Scheduler.
代码示例来源:origin: ReactiveX/RxJava
public Flowable<Flowable<Object>> apply(Flowable<Object> f) throws Exception {
return f.window(1);
代码示例来源:origin: ReactiveX/RxJava
public Flowable<Flowable<Object>> apply(Flowable<Object> f) throws Exception {
return f.window(1, 2);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void windowOpenCloseOpenNull() {
just1.window(null, new Function<Object, Publisher<Integer>>() {
public Publisher<Integer> apply(Object v) {
return just1;
代码示例来源:origin: ReactiveX/RxJava
public Object apply(Flowable<Object> f) throws Exception {
return f.window(Flowable.never()).flatMap(new Function<Flowable<Object>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<Object> v) throws Exception {
return v;
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void windowBoundarySupplierReturnsNull() {
just1.window(new Callable<Publisher<Object>>() {
public Publisher<Object> call() {
return null;
代码示例来源:origin: ReactiveX/RxJava
public Publisher<Flowable<Object>> apply(Flowable<Object> f)
throws Exception {
return f.window(Flowable.never()).takeLast(1);
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void windowOpenCloseCloseReturnsNull() {
Flowable.never().window(just1, new Function<Integer, Publisher<Object>>() {
public Publisher<Object> apply(Integer v) {
return null;
代码示例来源:origin: ReactiveX/RxJava
public void invalidSpan() {
try {
Flowable.just(1).window(-99, 1, TimeUnit.SECONDS);
fail("Should have thrown!");
} catch (IllegalArgumentException ex) {
assertEquals("timespan > 0 required but it was -99", ex.getMessage());
代码示例来源:origin: ReactiveX/RxJava
public Object apply(Flowable<Object> f) throws Exception {
return f.window(Functions.justCallable(Flowable.never())).flatMap(new Function<Flowable<Object>, Flowable<Object>>() {
public Flowable<Object> apply(Flowable<Object> v) throws Exception {
return v;
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
public void testNonOverlappingWindows() {
Flowable<String> subject = Flowable.just("one", "two", "three", "four", "five");
Flowable<Flowable<String>> windowed = subject.window(3);
List<List<String>> windows = toLists(windowed);
assertEquals(2, windows.size());
assertEquals(list("one", "two", "three"), windows.get(0));
assertEquals(list("four", "five"), windows.get(1));
代码示例来源:origin: ReactiveX/RxJava
public void testSkipAndCountGaplessWindows() {
Flowable<String> subject = Flowable.just("one", "two", "three", "four", "five");
Flowable<Flowable<String>> windowed = subject.window(3, 3);
List<List<String>> windows = toLists(windowed);
assertEquals(2, windows.size());
assertEquals(list("one", "two", "three"), windows.get(0));
assertEquals(list("four", "five"), windows.get(1));
代码示例来源:origin: ReactiveX/RxJava
public void firstWindowMissingBackpressure() {
.window(1, TimeUnit.SECONDS, 1)
代码示例来源:origin: ReactiveX/RxJava
public void timespanTimeskipCustomSchedulerBufferSize() {
Flowable.range(1, 10)
.window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
代码示例来源:origin: ReactiveX/RxJava
public void mainError() {
Flowable.error(new TestException())
代码示例来源:origin: ReactiveX/RxJava
public void timespanTimeskipDefaultScheduler() {
.window(1, 1, TimeUnit.MINUTES)
.awaitDone(5, TimeUnit.SECONDS)
代码示例来源:origin: ReactiveX/RxJava
public void restartTimer() {
Flowable.range(1, 5)
.window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
.assertResult(1, 2, 3, 4, 5);
代码示例来源:origin: ReactiveX/RxJava
public void boundaryOnError() {
TestSubscriber<Object> ts = Flowable.error(new TestException())
.flatMap(Functions.<Flowable<Object>>identity(), true)
List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
TestHelper.assertError(errors, 0, TestException.class);
代码示例来源:origin: ReactiveX/RxJava
public void timeskipJustOverlap() {
.window(2, 1, TimeUnit.MINUTES, Schedulers.single())
.awaitDone(5, TimeUnit.SECONDS)
代码示例来源:origin: ReactiveX/RxJava
public void exactBoundaryError() {
Flowable.error(new TestException())
.window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
代码示例来源:origin: ReactiveX/RxJava
public void mainError() {
Flowable.<Integer>error(new TestException())
.window(Flowable.never(), Functions.justFunction(Flowable.just(1)))