io.reactivex.Observable.generate()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(108)

本文整理了Java中io.reactivex.Observable.generate()方法的一些代码示例,展示了Observable.generate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.generate()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:generate

Observable.generate介绍

[英]Returns a cold, synchronous and stateless generator of values.

Scheduler: generate does not operate by default on a particular Scheduler.
[中]返回一个冷的、同步的、无状态的值生成器。
调度器:默认情况下,generate不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateStateFunctionInitialStateNull() {
  Observable.generate(null, new BiFunction<Object, Emitter<Object>, Object>() {
    @Override
    public Object apply(Object s, Emitter<Object> o) { o.onNext(1); return s; }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateStateConsumerNull() {
  Observable.generate(new Callable<Integer>() {
    @Override
    public Integer call() {
      return 1;
    }
  }, (BiConsumer<Integer, Emitter<Object>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateStateConsumerInitialStateNull() {
  BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
    @Override
    public void accept(Integer s, Emitter<Integer> o) {
      o.onNext(1);
    }
  };
  Observable.generate(null, generator);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateFunctionDisposeNull() {
  Observable.generate(new Callable<Object>() {
    @Override
    public Object call() {
      return 1;
    }
  }, new BiFunction<Object, Emitter<Object>, Object>() {
    @Override
    public Object apply(Object s, Emitter<Object> o) { o.onNext(1); return s; }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateConsumerNull() {
  Observable.generate(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateConsumerDisposeNull() {
  BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
    @Override
    public void accept(Integer s, Emitter<Integer> o) {
      o.onNext(1);
    }
  };
  Observable.generate(new Callable<Integer>() {
    @Override
    public Integer call() {
      return 1;
    }
  }, generator, null);
}

代码示例来源:origin: ReactiveX/RxJava

public static Observable<Event> getEventStream(final String type, final int numInstances) {
  return Observable.<Event>generate(new EventConsumer(numInstances, type)).subscribeOn(Schedulers.newThread());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a cold, synchronous and stateful generator of values.
 * <p>
 * <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/generate.2.png" alt="">
 * <p>
 * Note that the {@link Emitter#onNext}, {@link Emitter#onError} and
 * {@link Emitter#onComplete} methods provided to the function via the {@link Emitter} instance should be called synchronously,
 * never concurrently and only while the function body is executing. Calling them from multiple threads
 * or outside the function call is not supported and leads to an undefined behavior.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code generate} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <S> the type of the per-Observer state
 * @param <T> the generated value type
 * @param initialState the Callable to generate the initial state for each Observer
 * @param generator the Function called with the current state whenever a particular downstream Observer has
 * requested a value. The callback then should call {@code onNext}, {@code onError} or
 * {@code onComplete} to signal a value or a terminal event and should return a (new) state for
 * the next invocation. Signalling multiple {@code onNext}
 * in a call will make the operator signal {@code IllegalStateException}.
 * @return the new Observable instance
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
  return generate(initialState, generator, Functions.emptyConsumer());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void generateFunctionStateNullAllowed() {
  Observable.generate(new Callable<Object>() {
    @Override
    public Object call() {
      return null;
    }
  }, new BiFunction<Object, Emitter<Object>, Object>() {
    @Override
    public Object apply(Object s, Emitter<Object> o) { o.onComplete(); return s; }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateConsumerEmitsNull() {
  Observable.generate(new Consumer<Emitter<Object>>() {
    @Override
    public void accept(Emitter<Object> s) {
      s.onNext(null);
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void generateConsumerStateNullAllowed() {
  BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
    @Override
    public void accept(Integer s, Emitter<Integer> o) {
      o.onComplete();
    }
  };
  Observable.generate(new Callable<Integer>() {
    @Override
    public Integer call() {
      return null;
    }
  }, generator).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void multipleOnComplete() {
    Observable.generate(new Consumer<Emitter<Object>>() {
      @Override
      public void accept(Emitter<Object> e) throws Exception {
        e.onComplete();
        e.onComplete();
      }
    })
    .test()
    .assertResult();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void multipleOnNext() {
  Observable.generate(new Consumer<Emitter<Object>>() {
    @Override
    public void accept(Emitter<Object> e) throws Exception {
      e.onNext(1);
      e.onNext(2);
    }
  })
  .test()
  .assertFailure(IllegalStateException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.generate(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return 1;
      }
    }, new BiConsumer<Object, Emitter<Object>>() {
      @Override
      public void accept(Object s, Emitter<Object> e) throws Exception {
        e.onComplete();
      }
    }, Functions.emptyConsumer()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void generatorThrows() {
  Observable.generate(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return 1;
    }
  }, new BiConsumer<Object, Emitter<Object>>() {
    @Override
    public void accept(Object s, Emitter<Object> e) throws Exception {
      throw new TestException();
    }
  }, Functions.emptyConsumer())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void stateSupplierThrows() {
  Observable.generate(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  }, new BiConsumer<Object, Emitter<Object>>() {
    @Override
    public void accept(Object s, Emitter<Object> e) throws Exception {
      e.onNext(s);
    }
  }, Functions.emptyConsumer())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void statefulBiconsumer() {
  Observable.generate(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return 10;
    }
  }, new BiConsumer<Object, Emitter<Object>>() {
    @Override
    public void accept(Object s, Emitter<Object> e) throws Exception {
      e.onNext(s);
    }
  }, new Consumer<Object>() {
    @Override
    public void accept(Object d) throws Exception {
    }
  })
  .take(5)
  .test()
  .assertResult(10, 10, 10, 10, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void multipleOnError() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.generate(new Consumer<Emitter<Object>>() {
      @Override
      public void accept(Emitter<Object> e) throws Exception {
        e.onError(new TestException("First"));
        e.onError(new TestException("Second"));
      }
    })
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullError() {
  final int[] call = { 0 };
  Observable.generate(Functions.justCallable(1),
  new BiConsumer<Integer, Emitter<Object>>() {
    @Override
    public void accept(Integer s, Emitter<Object> e) throws Exception {
      try {
        e.onError(null);
      } catch (NullPointerException ex) {
        call[0]++;
      }
    }
  }, Functions.emptyConsumer())
  .test()
  .assertFailure(NullPointerException.class);
  assertEquals(0, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposerThrows() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.generate(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return 1;
      }
    }, new BiConsumer<Object, Emitter<Object>>() {
      @Override
      public void accept(Object s, Emitter<Object> e) throws Exception {
        e.onComplete();
      }
    }, new Consumer<Object>() {
      @Override
      public void accept(Object d) throws Exception {
        throw new TestException();
      }
    })
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, TestException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

相关文章

Observable类方法