io.reactivex.Flowable.window()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.5k)|赞(0)|评价(0)|浏览(161)

本文整理了Java中io.reactivex.Flowable.window()方法的一些代码示例,展示了Flowable.window()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.window()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:window

Flowable.window介绍

[英]Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting Publisher emits connected, non-overlapping windows, each containing count items. When the source Publisher completes or encounters an error, the resulting Publisher emits the current window and propagates the notification from the source Publisher.

Backpressure: The operator honors backpressure of its inner and outer subscribers, however, the inner Publisher uses an unbounded buffer that may hold at most count elements. Scheduler: This version of window does not operate by default on a particular Scheduler.
[中]返回一个FlowTable,该FlowTable发出它从源发布服务器收集的项目的窗口。生成的发布服务器将发出连接的、不重叠的窗口,每个窗口都包含计数项。当源发布服务器完成或遇到错误时,生成的发布服务器将发出当前窗口并从源发布服务器传播通知。
背压:操作员尊重其内部和外部订阅者的背压,但是,内部发布者使用一个无限缓冲区,最多可以容纳count个元素。调度程序:默认情况下,此版本的窗口不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Flowable<Object>> apply(Flowable<Object> f) throws Exception {
    return f.window(1);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Flowable<Object>> apply(Flowable<Object> f) throws Exception {
    return f.window(1, 2);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void windowOpenCloseOpenNull() {
  just1.window(null, new Function<Object, Publisher<Integer>>() {
    @Override
    public Publisher<Integer> apply(Object v) {
      return just1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Object> f) throws Exception {
    return f.window(Flowable.never()).flatMap(new Function<Flowable<Object>, Flowable<Object>>() {
      @Override
      public Flowable<Object> apply(Flowable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void windowBoundarySupplierReturnsNull() {
  just1.window(new Callable<Publisher<Object>>() {
    @Override
    public Publisher<Object> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Flowable<Object>> apply(Flowable<Object> f)
      throws Exception {
    return f.window(Flowable.never()).takeLast(1);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void windowOpenCloseCloseReturnsNull() {
  Flowable.never().window(just1, new Function<Integer, Publisher<Object>>() {
    @Override
    public Publisher<Object> apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void invalidSpan() {
  try {
    Flowable.just(1).window(-99, 1, TimeUnit.SECONDS);
    fail("Should have thrown!");
  } catch (IllegalArgumentException ex) {
    assertEquals("timespan > 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Object> f) throws Exception {
    return f.window(Functions.justCallable(Flowable.never())).flatMap(new Function<Flowable<Object>, Flowable<Object>>() {
      @Override
      public Flowable<Object> apply(Flowable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testNonOverlappingWindows() {
  Flowable<String> subject = Flowable.just("one", "two", "three", "four", "five");
  Flowable<Flowable<String>> windowed = subject.window(3);
  List<List<String>> windows = toLists(windowed);
  assertEquals(2, windows.size());
  assertEquals(list("one", "two", "three"), windows.get(0));
  assertEquals(list("four", "five"), windows.get(1));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSkipAndCountGaplessWindows() {
  Flowable<String> subject = Flowable.just("one", "two", "three", "four", "five");
  Flowable<Flowable<String>> windowed = subject.window(3, 3);
  List<List<String>> windows = toLists(windowed);
  assertEquals(2, windows.size());
  assertEquals(list("one", "two", "three"), windows.get(0));
  assertEquals(list("four", "five"), windows.get(1));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void firstWindowMissingBackpressure() {
  Flowable.never()
  .window(1, TimeUnit.SECONDS, 1)
  .test(0L)
  .assertFailure(MissingBackpressureException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipCustomSchedulerBufferSize() {
  Flowable.range(1, 10)
  .window(1, 1, TimeUnit.MINUTES, Schedulers.io(), 2)
  .flatMap(Functions.<Flowable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainError() {
  Flowable.error(new TestException())
  .window(Functions.justCallable(Flowable.never()))
  .test()
  .assertError(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timespanTimeskipDefaultScheduler() {
  Flowable.just(1)
  .window(1, 1, TimeUnit.MINUTES)
  .flatMap(Functions.<Flowable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void restartTimer() {
  Flowable.range(1, 5)
  .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
  .flatMap(Functions.<Flowable<Integer>>identity())
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void boundaryOnError() {
  TestSubscriber<Object> ts = Flowable.error(new TestException())
  .window(Flowable.never())
  .flatMap(Functions.<Flowable<Object>>identity(), true)
  .test()
  .assertFailure(CompositeException.class);
  List<Throwable> errors = TestHelper.compositeList(ts.errors().get(0));
  TestHelper.assertError(errors, 0, TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timeskipJustOverlap() {
  Flowable.just(1)
  .window(2, 1, TimeUnit.MINUTES, Schedulers.single())
  .flatMap(Functions.<Flowable<Integer>>identity())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void exactBoundaryError() {
  Flowable.error(new TestException())
  .window(1, TimeUnit.DAYS, Schedulers.single(), 2, true)
  .test()
  .assertSubscribed()
  .assertError(TestException.class)
  .assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void mainError() {
  Flowable.<Integer>error(new TestException())
  .window(Flowable.never(), Functions.justFunction(Flowable.just(1)))
  .flatMap(Functions.<Flowable<Integer>>identity())
  .test()
  .assertFailure(TestException.class);
}

相关文章

Flowable类方法