io.reactivex.Flowable.create()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(235)

本文整理了Java中io.reactivex.Flowable.create()方法的一些代码示例,展示了Flowable.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.create()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:create

Flowable.create介绍

[英]Provides an API (via a cold Flowable) that bridges the reactive world with the callback-style, generally non-backpressured world.

Example:

  1. Flowable.<Event>create(emitter -> {
  2. Callback listener = new Callback() {
  3. @Override
  4. public void onEvent(Event e) {
  5. emitter.onNext(e);
  6. if (e.isLast()) {
  7. emitter.onComplete();
  8. }
  9. }
  10. @Override
  11. public void onFailure(Exception e) {
  12. emitter.onError(e);
  13. }
  14. };
  15. AutoCloseable c = api.someMethod(listener);
  16. emitter.setCancellable(c::close);
  17. }, BackpressureStrategy.BUFFER);

You should call the FlowableEmitter onNext, onError and onComplete methods in a serialized fashion. The rest of its methods are thread-safe. Backpressure: The backpressure behavior is determined by the mode parameter. Scheduler: create does not operate by default on a particular Scheduler.
[中]提供一个API(通过一个冷流接口),它将反应式世界与回调风格(通常为非背压式世界)连接起来。
例子:

  1. Flowable.<Event>create(emitter -> {
  2. Callback listener = new Callback() {
  3. @Override
  4. public void onEvent(Event e) {
  5. emitter.onNext(e);
  6. if (e.isLast()) {
  7. emitter.onComplete();
  8. }
  9. }
  10. @Override
  11. public void onFailure(Exception e) {
  12. emitter.onError(e);
  13. }
  14. };
  15. AutoCloseable c = api.someMethod(listener);
  16. emitter.setCancellable(c::close);
  17. }, BackpressureStrategy.BUFFER);

您应该以序列化方式调用FlowableEmitter onNext、onError和onComplete方法。它的其余方法是线程安全的。背压:背压行为由模式参数决定。调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void modeNull() {
  3. Flowable.create(new FlowableOnSubscribe<Object>() {
  4. @Override
  5. public void subscribe(FlowableEmitter<Object> s) throws Exception { }
  6. }, null);
  7. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalError() {
  3. Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
  4. source.onNext(1);
  5. source.onNext(2);
  6. source.onComplete();
  7. ts.assertNoValues();
  8. ts.assertError(MissingBackpressureException.class);
  9. ts.assertNotComplete();
  10. Assert.assertEquals("create: could not emit value due to lack of requests", ts.errors().get(0).getMessage());
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void completeInlineExactLatest() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
  4. sourceNoCancel.onNext(1);
  5. sourceNoCancel.onComplete();
  6. ts.request(1);
  7. ts.assertValues(1);
  8. ts.assertNoErrors();
  9. ts.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void completeInline() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).subscribe(ts);
  4. sourceNoCancel.onNext(1);
  5. sourceNoCancel.onComplete();
  6. ts.request(2);
  7. ts.assertValues(1);
  8. ts.assertNoErrors();
  9. ts.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalMissing() {
  3. Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  4. source.onNext(1);
  5. source.onNext(2);
  6. source.onComplete();
  7. ts.assertValues(1, 2);
  8. ts.assertNoErrors();
  9. ts.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void completeInlineLatest() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
  4. sourceNoCancel.onNext(1);
  5. sourceNoCancel.onComplete();
  6. ts.request(2);
  7. ts.assertValues(1);
  8. ts.assertNoErrors();
  9. ts.assertComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void deferredRequest() {
  3. Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
  4. source.onNext(1);
  5. source.onNext(2);
  6. source.onComplete();
  7. ts.request(2);
  8. ts.assertValues(1, 2);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void errorInline() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).subscribe(ts);
  4. sourceNoCancel.onNext(1);
  5. sourceNoCancel.onError(new TestException());
  6. ts.request(2);
  7. ts.assertValues(1);
  8. ts.assertError(TestException.class);
  9. ts.assertNotComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalMissingRequested() {
  3. Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  4. ts.request(2);
  5. source.onNext(1);
  6. source.onNext(2);
  7. source.onComplete();
  8. ts.assertValues(1, 2);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void requestExact() {
  3. Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
  4. ts.request(2);
  5. source.onNext(1);
  6. source.onNext(2);
  7. source.onComplete();
  8. ts.assertValues(1, 2);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void errorInlineLatest() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.LATEST).subscribe(ts);
  4. sourceNoCancel.onNext(1);
  5. sourceNoCancel.onError(new TestException());
  6. ts.request(2);
  7. ts.assertValues(1);
  8. ts.assertError(TestException.class);
  9. ts.assertNotComplete();
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeOne() {
  3. Flowable.create(source, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
  4. ts.request(2);
  5. source.onNext(1);
  6. source.onNext(2);
  7. source.onComplete();
  8. ts.assertValues(1);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeOneNoCancel() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).take(1).subscribe(ts);
  4. ts.request(2);
  5. sourceNoCancel.onNext(1);
  6. sourceNoCancel.onNext(2);
  7. sourceNoCancel.onComplete();
  8. ts.assertValues(1);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void normalLatest() {
  3. Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
  4. source.onNext(1);
  5. source.onNext(2);
  6. source.onComplete();
  7. ts.assertNoValues();
  8. ts.request(1);
  9. ts.assertValues(2);
  10. ts.assertNoErrors();
  11. ts.assertComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void errorMissing() {
  3. Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  4. source.onNext(1);
  5. source.onNext(2);
  6. source.onError(new TestException());
  7. ts.request(1);
  8. ts.assertValues(1, 2);
  9. ts.assertError(TestException.class);
  10. ts.assertNotComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void takeNoCancel() {
  3. Flowable.create(sourceNoCancel, BackpressureStrategy.BUFFER).take(2).subscribe(ts);
  4. sourceNoCancel.onNext(1);
  5. sourceNoCancel.onNext(2);
  6. sourceNoCancel.onComplete();
  7. ts.request(2);
  8. ts.assertValues(1, 2);
  9. ts.assertNoErrors();
  10. ts.assertComplete();
  11. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void unsubscribedMissing() {
  3. Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
  4. ts.cancel();
  5. source.onNext(1);
  6. source.onNext(2);
  7. source.onError(new TestException());
  8. ts.request(1);
  9. ts.assertNoValues();
  10. ts.assertNoErrors();
  11. ts.assertNotComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void errorLatest() {
  3. Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
  4. source.onNext(1);
  5. source.onNext(2);
  6. source.onError(new TestException());
  7. ts.assertNoValues();
  8. ts.request(1);
  9. ts.assertValues(2);
  10. ts.assertError(TestException.class);
  11. ts.assertNotComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void unsubscribedError() {
  3. Flowable.create(source, BackpressureStrategy.ERROR).subscribe(ts);
  4. ts.cancel();
  5. source.onNext(1);
  6. source.onNext(2);
  7. source.onError(new TestException());
  8. ts.request(1);
  9. ts.assertNoValues();
  10. ts.assertNoErrors();
  11. ts.assertNotComplete();
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void unsubscribedBuffer() {
  3. Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
  4. ts.cancel();
  5. source.onNext(1);
  6. source.onNext(2);
  7. source.onError(new TestException());
  8. ts.request(1);
  9. ts.assertNoValues();
  10. ts.assertNoErrors();
  11. ts.assertNotComplete();
  12. }

相关文章

Flowable类方法