本文整理了Java中io.reactivex.Flowable.create()
方法的一些代码示例,展示了Flowable.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.create()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:create
[英]Provides an API (via a cold Flowable) that bridges the reactive world with the callback-style, generally non-backpressured world.
Example:
Flowable.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onComplete();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
}, BackpressureStrategy.BUFFER);
You should call the FlowableEmitter onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe. Backpressure: The backpressure behavior is determined by the mode parameter. Scheduler: create does not operate by default on a particular Scheduler.
[中]提供一个API(通过一个冷流接口),它将反应式世界与回调风格(通常为非背压式世界)连接起来。
例子:
Flowable.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onComplete();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
}, BackpressureStrategy.BUFFER);
您应该以序列化方式调用FlowableEmitter onNext、onError和onComplete方法。它的其余方法是线程安全的。背压:背压行为由模式参数决定。调度程序:默认情况下,创建不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void modeNull() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> s) throws Exception { }
}, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalError() {
Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertNoValues();
ts.assertError(MissingBackpressureException.class);
ts.assertNotComplete();
Assert.assertEquals("create: could not emit value due to lack of requests", ts.errors().get(0).getMessage());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void completeInlineExactLatest() {
Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
sourceNoCancel.onNext(1);
sourceNoCancel.onComplete();
ts.request(1);
ts.assertValues(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void completeInline() {
Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).subscribe(ts);
sourceNoCancel.onNext(1);
sourceNoCancel.onComplete();
ts.request(2);
ts.assertValues(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalMissing() {
Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertValues(1, 2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void completeInlineLatest() {
Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
sourceNoCancel.onNext(1);
sourceNoCancel.onComplete();
ts.request(2);
ts.assertValues(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void deferredRequest() {
Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.request(2);
ts.assertValues(1, 2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorInline() {
Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).subscribe(ts);
sourceNoCancel.onNext(1);
sourceNoCancel.onError(new TestException());
ts.request(2);
ts.assertValues(1);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalMissingRequested() {
Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
ts.request(2);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertValues(1, 2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void requestExact() {
Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
ts.request(2);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertValues(1, 2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorInlineLatest() {
Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
sourceNoCancel.onNext(1);
sourceNoCancel.onError(new TestException());
ts.request(2);
ts.assertValues(1);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeOne() {
Flowable.create(source, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
ts.request(2);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertValues(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeOneNoCancel() {
Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
ts.request(2);
sourceNoCancel.onNext(1);
sourceNoCancel.onNext(2);
sourceNoCancel.onComplete();
ts.assertValues(1);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normalLatest() {
Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
ts.assertNoValues();
ts.request(1);
ts.assertValues(2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorMissing() {
Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onError(new TestException());
ts.request(1);
ts.assertValues(1, 2);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeNoCancel() {
Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).take(2).subscribe(ts);
sourceNoCancel.onNext(1);
sourceNoCancel.onNext(2);
sourceNoCancel.onComplete();
ts.request(2);
ts.assertValues(1, 2);
ts.assertNoErrors();
ts.assertComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void unsubscribedMissing() {
Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
ts.cancel();
source.onNext(1);
source.onNext(2);
source.onError(new TestException());
ts.request(1);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void errorLatest() {
Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onError(new TestException());
ts.assertNoValues();
ts.request(1);
ts.assertValues(2);
ts.assertError(TestException.class);
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void unsubscribedError() {
Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
ts.cancel();
source.onNext(1);
source.onNext(2);
source.onError(new TestException());
ts.request(1);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void unsubscribedBuffer() {
Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
ts.cancel();
source.onNext(1);
source.onNext(2);
source.onError(new TestException());
ts.request(1);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
}
内容来源于网络,如有侵权,请联系作者删除!