io.reactivex.Flowable.create()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(187)

本文整理了Java中io.reactivex.Flowable.create()方法的一些代码示例,展示了Flowable.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.create()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:create

Flowable.create介绍

[英]Provides an API (via a cold Flowable) that bridges the reactive world with the callback-style, generally non-backpressured world.

Example:

Flowable.<Event>create(emitter -> { 
Callback listener = new Callback() { 
@Override 
public void onEvent(Event e) { 
emitter.onNext(e); 
if (e.isLast()) { 
emitter.onComplete(); 
} 
} 
@Override 
public void onFailure(Exception e) { 
emitter.onError(e); 
} 
}; 
AutoCloseable c = api.someMethod(listener); 
emitter.setCancellable(c::close); 
}, BackpressureStrategy.BUFFER);

You should call the FlowableEmitter onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe. Backpressure: The backpressure behavior is determined by the mode parameter. Scheduler: create does not operate by default on a particular Scheduler.
[中]提供一个API(通过一个冷流接口),它将反应式世界与回调风格(通常为非背压式世界)连接起来。
例子:

Flowable.<Event>create(emitter -> { 
Callback listener = new Callback() { 
@Override 
public void onEvent(Event e) { 
emitter.onNext(e); 
if (e.isLast()) { 
emitter.onComplete(); 
} 
} 
@Override 
public void onFailure(Exception e) { 
emitter.onError(e); 
} 
}; 
AutoCloseable c = api.someMethod(listener); 
emitter.setCancellable(c::close); 
}, BackpressureStrategy.BUFFER);

您应该以序列化方式调用FlowableEmitter onNext、onError和onComplete方法。它的其余方法是线程安全的。背压:背压行为由模式参数决定。调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void modeNull() {
  Flowable.create(new FlowableOnSubscribe<Object>() {
    @Override
    public void subscribe(FlowableEmitter<Object> s) throws Exception { }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalError() {
  Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertNoValues();
  ts.assertError(MissingBackpressureException.class);
  ts.assertNotComplete();
  Assert.assertEquals("create: could not emit value due to lack of requests", ts.errors().get(0).getMessage());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void completeInlineExactLatest() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onComplete();
  ts.request(1);
  ts.assertValues(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void completeInline() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).subscribe(ts);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onComplete();
  ts.request(2);
  ts.assertValues(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalMissing() {
  Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertValues(1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void completeInlineLatest() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onComplete();
  ts.request(2);
  ts.assertValues(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void deferredRequest() {
  Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.request(2);
  ts.assertValues(1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorInline() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).subscribe(ts);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onError(new TestException());
  ts.request(2);
  ts.assertValues(1);
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalMissingRequested() {
  Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  ts.request(2);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertValues(1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void requestExact() {
  Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
  ts.request(2);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertValues(1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorInlineLatest() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onError(new TestException());
  ts.request(2);
  ts.assertValues(1);
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeOne() {
  Flowable.create(source, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
  ts.request(2);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertValues(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeOneNoCancel() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
  ts.request(2);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onNext(2);
  sourceNoCancel.onComplete();
  ts.assertValues(1);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalLatest() {
  Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onComplete();
  ts.assertNoValues();
  ts.request(1);
  ts.assertValues(2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorMissing() {
  Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onError(new TestException());
  ts.request(1);
  ts.assertValues(1, 2);
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeNoCancel() {
  Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).take(2).subscribe(ts);
  sourceNoCancel.onNext(1);
  sourceNoCancel.onNext(2);
  sourceNoCancel.onComplete();
  ts.request(2);
  ts.assertValues(1, 2);
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void unsubscribedMissing() {
  Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  ts.cancel();
  source.onNext(1);
  source.onNext(2);
  source.onError(new TestException());
  ts.request(1);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorLatest() {
  Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
  source.onNext(1);
  source.onNext(2);
  source.onError(new TestException());
  ts.assertNoValues();
  ts.request(1);
  ts.assertValues(2);
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void unsubscribedError() {
  Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
  ts.cancel();
  source.onNext(1);
  source.onNext(2);
  source.onError(new TestException());
  ts.request(1);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void unsubscribedBuffer() {
  Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
  ts.cancel();
  source.onNext(1);
  source.onNext(2);
  source.onError(new TestException());
  ts.request(1);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertNotComplete();
}

相关文章

Flowable类方法