io.reactivex.Observable.create()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中io.reactivex.Observable.create()方法的一些代码示例,展示了Observable.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.create()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:create

Observable.create介绍

[英]Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.

Example:

  1. Observable.<Event>create(emitter -> {
  2. Callback listener = new Callback() {
  3. @Override
  4. public void onEvent(Event e) {
  5. emitter.onNext(e);
  6. if (e.isLast()) {
  7. emitter.onComplete();
  8. }
  9. }
  10. @Override
  11. public void onFailure(Exception e) {
  12. emitter.onError(e);
  13. }
  14. };
  15. AutoCloseable c = api.someMethod(listener);
  16. emitter.setCancellable(c::close);
  17. });

You should call the ObservableEmitter's onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe. Scheduler: create does not operate by default on a particular Scheduler.
[中]提供了一个API(通过一个冷可观察的API),将反应式世界与回调式世界连接起来。
例子:

  1. Observable.<Event>create(emitter -> {
  2. Callback listener = new Callback() {
  3. @Override
  4. public void onEvent(Event e) {
  5. emitter.onNext(e);
  6. if (e.isLast()) {
  7. emitter.onComplete();
  8. }
  9. }
  10. @Override
  11. public void onFailure(Exception e) {
  12. emitter.onError(e);
  13. }
  14. };
  15. AutoCloseable c = api.someMethod(listener);
  16. emitter.setCancellable(c::close);
  17. });

您应该以序列化的方式调用ObservieMitter的onNext、onError和onComplete方法。它的其他方法是线程安全的。调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public ObservableSource<? extends Object> apply(Integer v) throws Exception {
  3. return Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
  6. while (!emitter.isDisposed()) {
  7. Thread.sleep(100);
  8. }
  9. interrupted.set(true);
  10. }
  11. });
  12. }
  13. })

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void nullArgument() {
  3. Observable.create(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullThrowable() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  6. e.onError(null);
  7. }
  8. })
  9. .test()
  10. .assertFailure(NullPointerException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullValueSync() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  6. e.serialize().onNext(null);
  7. }
  8. })
  9. .test()
  10. .assertFailure(NullPointerException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullThrowableSync() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  6. e.serialize().onError(null);
  7. }
  8. })
  9. .test()
  10. .assertFailure(NullPointerException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void callbackThrows() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  6. throw new TestException();
  7. }
  8. })
  9. .test()
  10. .assertFailure(TestException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emitterHasToString() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
  6. assertTrue(emitter.toString().contains(ObservableCreate.CreateEmitter.class.getSimpleName()));
  7. assertTrue(emitter.serialize().toString().contains(ObservableCreate.CreateEmitter.class.getSimpleName()));
  8. }
  9. }).test().assertEmpty();
  10. }
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nullValue() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  6. e.onNext(null);
  7. }
  8. })
  9. .test()
  10. .assertFailure(NullPointerException.class);
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noSubsequentSubscriptionIterable() {
  4. final int[] calls = { 0 };
  5. Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
  6. @Override
  7. public void subscribe(ObservableEmitter<Integer> s) throws Exception {
  8. calls[0]++;
  9. s.onNext(1);
  10. s.onComplete();
  11. }
  12. });
  13. Observable.concat(Arrays.asList(source, source)).firstElement()
  14. .test()
  15. .assertResult(1);
  16. assertEquals(1, calls[0]);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noSubsequentSubscriptionDelayErrorIterable() {
  4. final int[] calls = { 0 };
  5. Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
  6. @Override
  7. public void subscribe(ObservableEmitter<Integer> s) throws Exception {
  8. calls[0]++;
  9. s.onNext(1);
  10. s.onComplete();
  11. }
  12. });
  13. Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
  14. .test()
  15. .assertResult(1);
  16. assertEquals(1, calls[0]);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void createNullValue() {
  3. final Throwable[] error = { null };
  4. Observable.create(new ObservableOnSubscribe<Integer>() {
  5. @Override
  6. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  7. try {
  8. e.onNext(null);
  9. e.onNext(1);
  10. e.onError(new TestException());
  11. e.onComplete();
  12. } catch (Throwable ex) {
  13. error[0] = ex;
  14. }
  15. }
  16. })
  17. .test()
  18. .assertFailure(NullPointerException.class);
  19. assertNull(error[0]);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void createNullValueSerialized() {
  3. final Throwable[] error = { null };
  4. Observable.create(new ObservableOnSubscribe<Integer>() {
  5. @Override
  6. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  7. e = e.serialize();
  8. try {
  9. e.onNext(null);
  10. e.onNext(1);
  11. e.onError(new TestException());
  12. e.onComplete();
  13. } catch (Throwable ex) {
  14. error[0] = ex;
  15. }
  16. }
  17. })
  18. .test()
  19. .assertFailure(NullPointerException.class);
  20. assertNull(error[0]);
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void tryOnError() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. final Boolean[] response = { null };
  6. Observable.create(new ObservableOnSubscribe<Object>() {
  7. @Override
  8. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  9. e.onNext(1);
  10. response[0] = e.tryOnError(new TestException());
  11. }
  12. })
  13. .take(1)
  14. .test()
  15. .assertResult(1);
  16. assertFalse(response[0]);
  17. assertTrue(errors.toString(), errors.isEmpty());
  18. } finally {
  19. RxJavaPlugins.reset();
  20. }
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void tryOnErrorSerialized() {
  3. List<Throwable> errors = TestHelper.trackPluginErrors();
  4. try {
  5. final Boolean[] response = { null };
  6. Observable.create(new ObservableOnSubscribe<Object>() {
  7. @Override
  8. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  9. e = e.serialize();
  10. e.onNext(1);
  11. response[0] = e.tryOnError(new TestException());
  12. }
  13. })
  14. .take(1)
  15. .test()
  16. .assertResult(1);
  17. assertFalse(response[0]);
  18. assertTrue(errors.toString(), errors.isEmpty());
  19. } finally {
  20. RxJavaPlugins.reset();
  21. }
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noSubsequentSubscription() {
  4. final int[] calls = { 0 };
  5. Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
  6. @Override
  7. public void subscribe(ObservableEmitter<Integer> s) throws Exception {
  8. calls[0]++;
  9. s.onNext(1);
  10. s.onComplete();
  11. }
  12. });
  13. Observable.concatArray(source, source).firstElement()
  14. .test()
  15. .assertResult(1);
  16. assertEquals(1, calls[0]);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @SuppressWarnings("unchecked")
  2. @Test
  3. public void noSubsequentSubscriptionDelayError() {
  4. final int[] calls = { 0 };
  5. Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
  6. @Override
  7. public void subscribe(ObservableEmitter<Integer> s) throws Exception {
  8. calls[0]++;
  9. s.onNext(1);
  10. s.onComplete();
  11. }
  12. });
  13. Observable.concatArrayDelayError(source, source).firstElement()
  14. .test()
  15. .assertResult(1);
  16. assertEquals(1, calls[0]);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void basic() {
  3. final Disposable d = Disposables.empty();
  4. Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
  5. @Override
  6. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  7. e.setDisposable(d);
  8. e.onNext(1);
  9. e.onNext(2);
  10. e.onNext(3);
  11. e.onComplete();
  12. e.onError(new TestException());
  13. e.onNext(4);
  14. e.onError(new TestException());
  15. e.onComplete();
  16. }
  17. })
  18. .test()
  19. .assertResult(1, 2, 3);
  20. assertTrue(d.isDisposed());
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void basicWithError() {
  3. final Disposable d = Disposables.empty();
  4. Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
  5. @Override
  6. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  7. e.setDisposable(d);
  8. e.onNext(1);
  9. e.onNext(2);
  10. e.onNext(3);
  11. e.onError(new TestException());
  12. e.onComplete();
  13. e.onNext(4);
  14. e.onError(new TestException());
  15. }
  16. })
  17. .test()
  18. .assertFailure(TestException.class, 1, 2, 3);
  19. assertTrue(d.isDisposed());
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void basicWithErrorSerialized() {
  3. final Disposable d = Disposables.empty();
  4. Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
  5. @Override
  6. public void subscribe(ObservableEmitter<Integer> e) throws Exception {
  7. e = e.serialize();
  8. e.setDisposable(d);
  9. e.onNext(1);
  10. e.onNext(2);
  11. e.onNext(3);
  12. e.onError(new TestException());
  13. e.onComplete();
  14. e.onNext(4);
  15. e.onError(new TestException());
  16. }
  17. })
  18. .test()
  19. .assertFailure(TestException.class, 1, 2, 3);
  20. assertTrue(d.isDisposed());
  21. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void serializedConcurrentOnNext() {
  3. Observable.create(new ObservableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(ObservableEmitter<Object> e) throws Exception {
  6. final ObservableEmitter<Object> f = e.serialize();
  7. Runnable r1 = new Runnable() {
  8. @Override
  9. public void run() {
  10. for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
  11. f.onNext(1);
  12. }
  13. }
  14. };
  15. TestHelper.race(r1, r1);
  16. }
  17. })
  18. .take(TestHelper.RACE_DEFAULT_LOOPS)
  19. .test()
  20. .assertSubscribed().assertValueCount(TestHelper.RACE_DEFAULT_LOOPS).assertComplete().assertNoErrors();
  21. }

相关文章

Observable类方法