io.reactivex.Observable.generate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中io.reactivex.Observable.generate()方法的一些代码示例,展示了Observable.generate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.generate()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:generate

Observable.generate介绍

[英]Returns a cold, synchronous and stateless generator of values.

Scheduler: generate does not operate by default on a particular Scheduler.
[中]返回一个冷的、同步的、无状态的值生成器。
调度器:默认情况下,generate不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateStateFunctionInitialStateNull() {
  3. Observable.generate(null, new BiFunction<Object, Emitter<Object>, Object>() {
  4. @Override
  5. public Object apply(Object s, Emitter<Object> o) { o.onNext(1); return s; }
  6. });
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateStateConsumerNull() {
  3. Observable.generate(new Callable<Integer>() {
  4. @Override
  5. public Integer call() {
  6. return 1;
  7. }
  8. }, (BiConsumer<Integer, Emitter<Object>>)null);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateStateConsumerInitialStateNull() {
  3. BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
  4. @Override
  5. public void accept(Integer s, Emitter<Integer> o) {
  6. o.onNext(1);
  7. }
  8. };
  9. Observable.generate(null, generator);
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateFunctionDisposeNull() {
  3. Observable.generate(new Callable<Object>() {
  4. @Override
  5. public Object call() {
  6. return 1;
  7. }
  8. }, new BiFunction<Object, Emitter<Object>, Object>() {
  9. @Override
  10. public Object apply(Object s, Emitter<Object> o) { o.onNext(1); return s; }
  11. }, null);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateConsumerNull() {
  3. Observable.generate(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateConsumerDisposeNull() {
  3. BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
  4. @Override
  5. public void accept(Integer s, Emitter<Integer> o) {
  6. o.onNext(1);
  7. }
  8. };
  9. Observable.generate(new Callable<Integer>() {
  10. @Override
  11. public Integer call() {
  12. return 1;
  13. }
  14. }, generator, null);
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. public static Observable<Event> getEventStream(final String type, final int numInstances) {
  2. return Observable.<Event>generate(new EventConsumer(numInstances, type)).subscribeOn(Schedulers.newThread());
  3. }

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns a cold, synchronous and stateful generator of values.
  3. * <p>
  4. * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/generate.2.png" alt="">
  5. * <p>
  6. * Note that the {@link Emitter#onNext}, {@link Emitter#onError} and
  7. * {@link Emitter#onComplete} methods provided to the function via the {@link Emitter} instance should be called synchronously,
  8. * never concurrently and only while the function body is executing. Calling them from multiple threads
  9. * or outside the function call is not supported and leads to an undefined behavior.
  10. * <dl>
  11. * <dt><b>Scheduler:</b></dt>
  12. * <dd>{@code generate} does not operate by default on a particular {@link Scheduler}.</dd>
  13. * </dl>
  14. *
  15. * @param <S> the type of the per-Observer state
  16. * @param <T> the generated value type
  17. * @param initialState the Callable to generate the initial state for each Observer
  18. * @param generator the Function called with the current state whenever a particular downstream Observer has
  19. * requested a value. The callback then should call {@code onNext}, {@code onError} or
  20. * {@code onComplete} to signal a value or a terminal event and should return a (new) state for
  21. * the next invocation. Signalling multiple {@code onNext}
  22. * in a call will make the operator signal {@code IllegalStateException}.
  23. * @return the new Observable instance
  24. */
  25. @CheckReturnValue
  26. @SchedulerSupport(SchedulerSupport.NONE)
  27. public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
  28. return generate(initialState, generator, Functions.emptyConsumer());
  29. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void generateFunctionStateNullAllowed() {
  3. Observable.generate(new Callable<Object>() {
  4. @Override
  5. public Object call() {
  6. return null;
  7. }
  8. }, new BiFunction<Object, Emitter<Object>, Object>() {
  9. @Override
  10. public Object apply(Object s, Emitter<Object> o) { o.onComplete(); return s; }
  11. }).blockingSubscribe();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void generateConsumerEmitsNull() {
  3. Observable.generate(new Consumer<Emitter<Object>>() {
  4. @Override
  5. public void accept(Emitter<Object> s) {
  6. s.onNext(null);
  7. }
  8. }).blockingLast();
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void generateConsumerStateNullAllowed() {
  3. BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
  4. @Override
  5. public void accept(Integer s, Emitter<Integer> o) {
  6. o.onComplete();
  7. }
  8. };
  9. Observable.generate(new Callable<Integer>() {
  10. @Override
  11. public Integer call() {
  12. return null;
  13. }
  14. }, generator).blockingSubscribe();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void multipleOnComplete() {
  3. Observable.generate(new Consumer<Emitter<Object>>() {
  4. @Override
  5. public void accept(Emitter<Object> e) throws Exception {
  6. e.onComplete();
  7. e.onComplete();
  8. }
  9. })
  10. .test()
  11. .assertResult();
  12. }
  13. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void multipleOnNext() {
  3. Observable.generate(new Consumer<Emitter<Object>>() {
  4. @Override
  5. public void accept(Emitter<Object> e) throws Exception {
  6. e.onNext(1);
  7. e.onNext(2);
  8. }
  9. })
  10. .test()
  11. .assertFailure(IllegalStateException.class, 1);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void dispose() {
  3. TestHelper.checkDisposed(Observable.generate(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. return 1;
  7. }
  8. }, new BiConsumer<Object, Emitter<Object>>() {
  9. @Override
  10. public void accept(Object s, Emitter<Object> e) throws Exception {
  11. e.onComplete();
  12. }
  13. }, Functions.emptyConsumer()));
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void generatorThrows() {
  3. Observable.generate(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. return 1;
  7. }
  8. }, new BiConsumer<Object, Emitter<Object>>() {
  9. @Override
  10. public void accept(Object s, Emitter<Object> e) throws Exception {
  11. throw new TestException();
  12. }
  13. }, Functions.emptyConsumer())
  14. .test()
  15. .assertFailure(TestException.class);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void stateSupplierThrows() {
  3. Observable.generate(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. throw new TestException();
  7. }
  8. }, new BiConsumer<Object, Emitter<Object>>() {
  9. @Override
  10. public void accept(Object s, Emitter<Object> e) throws Exception {
  11. e.onNext(s);
  12. }
  13. }, Functions.emptyConsumer())
  14. .test()
  15. .assertFailure(TestException.class);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void statefulBiconsumer() {
  3. Observable.generate(new Callable<Object>() {
  4. @Override
  5. public Object call() throws Exception {
  6. return 10;
  7. }
  8. }, new BiConsumer<Object, Emitter<Object>>() {
  9. @Override
  10. public void accept(Object s, Emitter<Object> e) throws Exception {
  11. e.onNext(s);
  12. }
  13. }, new Consumer<Object>() {
  14. @Override
  15. public void accept(Object d) throws Exception {
  16. }
  17. })
  18. .take(5)
  19. .test()
  20. .assertResult(10, 10, 10, 10, 10);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void multipleOnError() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.generate(new Consumer<Emitter<Object>>() {
  6. @Override
  7. public void accept(Emitter<Object> e) throws Exception {
  8. e.onError(new TestException("First"));
  9. e.onError(new TestException("Second"));
  10. }
  11. })
  12. .test()
  13. .assertFailure(TestException.class);
  14. TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
  15. } finally {
  16. RxJavaPlugins.reset();
  17. }
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullError() {
  3. final int[] call = { 0 };
  4. Observable.generate(Functions.justCallable(1),
  5. new BiConsumer<Integer, Emitter<Object>>() {
  6. @Override
  7. public void accept(Integer s, Emitter<Object> e) throws Exception {
  8. try {
  9. e.onError(null);
  10. } catch (NullPointerException ex) {
  11. call[0]++;
  12. }
  13. }
  14. }, Functions.emptyConsumer())
  15. .test()
  16. .assertFailure(NullPointerException.class);
  17. assertEquals(0, call[0]);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposerThrows() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. Observable.generate(new Callable<Object>() {
  6. @Override
  7. public Object call() throws Exception {
  8. return 1;
  9. }
  10. }, new BiConsumer<Object, Emitter<Object>>() {
  11. @Override
  12. public void accept(Object s, Emitter<Object> e) throws Exception {
  13. e.onComplete();
  14. }
  15. }, new Consumer<Object>() {
  16. @Override
  17. public void accept(Object d) throws Exception {
  18. throw new TestException();
  19. }
  20. })
  21. .test()
  22. .assertResult();
  23. TestHelper.assertUndeliverable(errors, 0, TestException.class);
  24. } finally {
  25. RxJavaPlugins.reset();
  26. }
  27. }

相关文章

Observable类方法