本文整理了Java中io.reactivex.Observable.create()
方法的一些代码示例,展示了Observable.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.create()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:create
[英]Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
Example:
Observable.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onComplete();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
You should call the ObservableEmitter's onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe. Scheduler: create does not operate by default on a particular Scheduler.
[中]提供了一个API(通过一个冷可观察的API),将反应式世界与回调式世界连接起来。
例子:
Observable.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onComplete();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
您应该以序列化的方式调用ObservieMitter的onNext、onError和onComplete方法。它的其他方法是线程安全的。调度程序:默认情况下,创建不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<? extends Object> apply(Integer v) throws Exception {
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
while (!emitter.isDisposed()) {
Thread.sleep(100);
}
interrupted.set(true);
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void nullArgument() {
Observable.create(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullThrowable() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onError(null);
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullValueSync() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.serialize().onNext(null);
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullThrowableSync() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.serialize().onError(null);
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void callbackThrows() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void emitterHasToString() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
assertTrue(emitter.toString().contains(ObservableCreate.CreateEmitter.class.getSimpleName()));
assertTrue(emitter.serialize().toString().contains(ObservableCreate.CreateEmitter.class.getSimpleName()));
}
}).test().assertEmpty();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullValue() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onNext(null);
}
})
.test()
.assertFailure(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionIterable() {
final int[] calls = { 0 };
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
calls[0]++;
s.onNext(1);
s.onComplete();
}
});
Observable.concat(Arrays.asList(source, source)).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionDelayErrorIterable() {
final int[] calls = { 0 };
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
calls[0]++;
s.onNext(1);
s.onComplete();
}
});
Observable.concatDelayError(Arrays.asList(source, source)).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createNullValue() {
final Throwable[] error = { null };
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
try {
e.onNext(null);
e.onNext(1);
e.onError(new TestException());
e.onComplete();
} catch (Throwable ex) {
error[0] = ex;
}
}
})
.test()
.assertFailure(NullPointerException.class);
assertNull(error[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void createNullValueSerialized() {
final Throwable[] error = { null };
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e = e.serialize();
try {
e.onNext(null);
e.onNext(1);
e.onError(new TestException());
e.onComplete();
} catch (Throwable ex) {
error[0] = ex;
}
}
})
.test()
.assertFailure(NullPointerException.class);
assertNull(error[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void tryOnError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final Boolean[] response = { null };
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onNext(1);
response[0] = e.tryOnError(new TestException());
}
})
.take(1)
.test()
.assertResult(1);
assertFalse(response[0]);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void tryOnErrorSerialized() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final Boolean[] response = { null };
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e = e.serialize();
e.onNext(1);
response[0] = e.tryOnError(new TestException());
}
})
.take(1)
.test()
.assertResult(1);
assertFalse(response[0]);
assertTrue(errors.toString(), errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscription() {
final int[] calls = { 0 };
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
calls[0]++;
s.onNext(1);
s.onComplete();
}
});
Observable.concatArray(source, source).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noSubsequentSubscriptionDelayError() {
final int[] calls = { 0 };
Observable<Integer> source = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> s) throws Exception {
calls[0]++;
s.onNext(1);
s.onComplete();
}
});
Observable.concatArrayDelayError(source, source).firstElement()
.test()
.assertResult(1);
assertEquals(1, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void basic() {
final Disposable d = Disposables.empty();
Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.setDisposable(d);
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
e.onError(new TestException());
e.onNext(4);
e.onError(new TestException());
e.onComplete();
}
})
.test()
.assertResult(1, 2, 3);
assertTrue(d.isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void basicWithError() {
final Disposable d = Disposables.empty();
Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.setDisposable(d);
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new TestException());
e.onComplete();
e.onNext(4);
e.onError(new TestException());
}
})
.test()
.assertFailure(TestException.class, 1, 2, 3);
assertTrue(d.isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void basicWithErrorSerialized() {
final Disposable d = Disposables.empty();
Observable.<Integer>create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e = e.serialize();
e.setDisposable(d);
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new TestException());
e.onComplete();
e.onNext(4);
e.onError(new TestException());
}
})
.test()
.assertFailure(TestException.class, 1, 2, 3);
assertTrue(d.isDisposed());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void serializedConcurrentOnNext() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
final ObservableEmitter<Object> f = e.serialize();
Runnable r1 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
f.onNext(1);
}
}
};
TestHelper.race(r1, r1);
}
})
.take(TestHelper.RACE_DEFAULT_LOOPS)
.test()
.assertSubscribed().assertValueCount(TestHelper.RACE_DEFAULT_LOOPS).assertComplete().assertNoErrors();
}
内容来源于网络,如有侵权,请联系作者删除!