io.reactivex.Observable.create()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(185)

本文整理了Java中io.reactivex.Observable.create()方法的一些代码示例,展示了Observable.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.create()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:create

Observable.create介绍

[英]Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.

Example:

Observable.<Event>create(emitter -> { 
Callback listener = new Callback() { 
@Override 
public void onEvent(Event e) { 
emitter.onNext(e); 
if (e.isLast()) { 
emitter.onComplete(); 
} 
} 
@Override 
public void onFailure(Exception e) { 
emitter.onError(e); 
} 
}; 
AutoCloseable c = api.someMethod(listener); 
emitter.setCancellable(c::close); 
});

You should call the ObservableEmitter's onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe. Scheduler: create does not operate by default on a particular Scheduler.
[中]提供了一个API(通过一个冷可观察的API),将反应式世界与回调式世界连接起来。
例子:

Observable.<Event>create(emitter -> { 
Callback listener = new Callback() { 
@Override 
public void onEvent(Event e) { 
emitter.onNext(e); 
if (e.isLast()) { 
emitter.onComplete(); 
} 
} 
@Override 
public void onFailure(Exception e) { 
emitter.onError(e); 
} 
}; 
AutoCloseable c = api.someMethod(listener); 
emitter.setCancellable(c::close); 
});

您应该以序列化的方式调用ObservieMitter的onNext、onError和onComplete方法。它的其他方法是线程安全的。调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<? extends Object> apply(Integer v) throws Exception {
    return Observable.create(new ObservableOnSubscribe<Object>() {
      @Override
      public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
        while (!emitter.isDisposed()) {
          Thread.sleep(100);
        }
        interrupted.set(true);
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void nullArgument() {
  Observable.create(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullThrowable() {
  Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
      e.onError(null);
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullValueSync() {
  Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
      e.serialize().onNext(null);
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullThrowableSync() {
  Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
      e.serialize().onError(null);
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void callbackThrows() {
  Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void emitterHasToString() {
    Observable.create(new ObservableOnSubscribe<Object>() {
      @Override
      public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
        assertTrue(emitter.toString().contains(ObservableCreate.CreateEmitter.class.getSimpleName()));
        assertTrue(emitter.serialize().toString().contains(ObservableCreate.CreateEmitter.class.getSimpleName()));
      }
    }).test().assertEmpty();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullValue() {
  Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
      e.onNext(null);
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionIterable() {
  final int[] calls = { 0 };
  Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> s) throws Exception {
      calls[0]++;
      s.onNext(1);
      s.onComplete();
    }
  });
  Observable.concat(Arrays.asList(source, source)).firstElement()
  .test()
  .assertResult(1);
  assertEquals(1, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionDelayErrorIterable() {
  final int[] calls = { 0 };
  Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> s) throws Exception {
      calls[0]++;
      s.onNext(1);
      s.onComplete();
    }
  });
  Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
  .test()
  .assertResult(1);
  assertEquals(1, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createNullValue() {
  final Throwable[] error = { null };
  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
      try {
        e.onNext(null);
        e.onNext(1);
        e.onError(new TestException());
        e.onComplete();
      } catch (Throwable ex) {
        error[0] = ex;
      }
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
  assertNull(error[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void createNullValueSerialized() {
  final Throwable[] error = { null };
  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
      e = e.serialize();
      try {
        e.onNext(null);
        e.onNext(1);
        e.onError(new TestException());
        e.onComplete();
      } catch (Throwable ex) {
        error[0] = ex;
      }
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
  assertNull(error[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void tryOnError() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    final Boolean[] response = { null };
    Observable.create(new ObservableOnSubscribe<Object>() {
      @Override
      public void subscribe(ObservableEmitter<Object> e) throws Exception {
        e.onNext(1);
        response[0] = e.tryOnError(new TestException());
      }
    })
    .take(1)
    .test()
    .assertResult(1);
    assertFalse(response[0]);
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void tryOnErrorSerialized() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    final Boolean[] response = { null };
    Observable.create(new ObservableOnSubscribe<Object>() {
      @Override
      public void subscribe(ObservableEmitter<Object> e) throws Exception {
        e = e.serialize();
        e.onNext(1);
        response[0] = e.tryOnError(new TestException());
      }
    })
    .take(1)
    .test()
    .assertResult(1);
    assertFalse(response[0]);
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscription() {
  final int[] calls = { 0 };
  Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> s) throws Exception {
      calls[0]++;
      s.onNext(1);
      s.onComplete();
    }
  });
  Observable.concatArray(source, source).firstElement()
  .test()
  .assertResult(1);
  assertEquals(1, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionDelayError() {
  final int[] calls = { 0 };
  Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> s) throws Exception {
      calls[0]++;
      s.onNext(1);
      s.onComplete();
    }
  });
  Observable.concatArrayDelayError(source, source).firstElement()
  .test()
  .assertResult(1);
  assertEquals(1, calls[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void basic() {
  final Disposable d = Disposables.empty();
  Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
      e.setDisposable(d);
      e.onNext(1);
      e.onNext(2);
      e.onNext(3);
      e.onComplete();
      e.onError(new TestException());
      e.onNext(4);
      e.onError(new TestException());
      e.onComplete();
    }
  })
  .test()
  .assertResult(1, 2, 3);
  assertTrue(d.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void basicWithError() {
  final Disposable d = Disposables.empty();
  Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
      e.setDisposable(d);
      e.onNext(1);
      e.onNext(2);
      e.onNext(3);
      e.onError(new TestException());
      e.onComplete();
      e.onNext(4);
      e.onError(new TestException());
    }
  })
  .test()
  .assertFailure(TestException.class, 1, 2, 3);
  assertTrue(d.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void basicWithErrorSerialized() {
  final Disposable d = Disposables.empty();
  Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
      e = e.serialize();
      e.setDisposable(d);
      e.onNext(1);
      e.onNext(2);
      e.onNext(3);
      e.onError(new TestException());
      e.onComplete();
      e.onNext(4);
      e.onError(new TestException());
    }
  })
  .test()
  .assertFailure(TestException.class, 1, 2, 3);
  assertTrue(d.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void serializedConcurrentOnNext() {
  Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> e) throws Exception {
      final ObservableEmitter<Object> f = e.serialize();
      Runnable r1 = new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
            f.onNext(1);
          }
        }
      };
      TestHelper.race(r1, r1);
    }
  })
  .take(TestHelper.RACE_DEFAULT_LOOPS)
  .test()
  .assertSubscribed().assertValueCount(TestHelper.RACE_DEFAULT_LOOPS).assertComplete().assertNoErrors();
}

相关文章

Observable类方法