io.reactivex.Flowable.generate()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(130)

本文整理了Java中io.reactivex.Flowable.generate()方法的一些代码示例,展示了Flowable.generate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.generate()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:generate

Flowable.generate介绍

[英]Returns a cold, synchronous, stateless and backpressure-aware generator of values. Backpressure: The operator honors downstream backpressure. Scheduler: generate does not operate by default on a particular Scheduler.
[中]返回值的冷、同步、无状态和背压感知生成器。背压:操作员尊重下游背压。调度程序:默认情况下,生成不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateStateFunctionInitialStateNull() {
  Flowable.generate(null, new BiFunction<Object, Emitter<Object>, Object>() {
    @Override
    public Object apply(Object s, Emitter<Object> o) {
      o.onNext(1); return s;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateStateConsumerNull() {
  Flowable.generate(new Callable<Integer>() {
    @Override
    public Integer call() {
      return 1;
    }
  }, (BiConsumer<Integer, Emitter<Object>>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateStateConsumerInitialStateNull() {
  BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
    @Override
    public void accept(Integer s, Emitter<Integer> o) {
      o.onNext(1);
    }
  };
  Flowable.generate(null, generator);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateConsumerNull() {
  Flowable.generate(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateFunctionDisposeNull() {
  Flowable.generate(new Callable<Object>() {
    @Override
    public Object call() {
      return 1;
    }
  }, new BiFunction<Object, Emitter<Object>, Object>() {
    @Override
    public Object apply(Object s, Emitter<Object> o) {
      o.onNext(1); return s;
    }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateConsumerDisposeNull() {
  BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
    @Override
    public void accept(Integer s, Emitter<Integer> o) {
      o.onNext(1);
    }
  };
  Flowable.generate(new Callable<Integer>() {
    @Override
    public Integer call() {
      return 1;
    }
  }, generator, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void generateConsumerEmitsNull() {
  Flowable.generate(new Consumer<Emitter<Object>>() {
    @Override
    public void accept(Emitter<Object> s) {
      s.onNext(null);
    }
  }).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void generateFunctionStateNullAllowed() {
  Flowable.generate(new Callable<Object>() {
    @Override
    public Object call() {
      return null;
    }
  }, new BiFunction<Object, Emitter<Object>, Object>() {
    @Override
    public Object apply(Object s, Emitter<Object> o) {
      o.onComplete(); return s;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void generateConsumerStateNullAllowed() {
  BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
    @Override
    public void accept(Integer s, Emitter<Integer> o) {
      o.onComplete();
    }
  };
  Flowable.generate(new Callable<Integer>() {
    @Override
    public Integer call() {
      return null;
    }
  }, generator).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Long> createPublisher(final long elements) {
    return
      Flowable.generate(Functions.justCallable(0L),
      new BiFunction<Long, Emitter<Long>, Long>() {
        @Override
        public Long apply(Long s, Emitter<Long> e) throws Exception {
          e.onNext(s);
          if (++s == elements) {
            e.onComplete();
          }
          return s;
        }
      }, Functions.<Long>emptyConsumer())
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void multipleOnNext() {
  Flowable.generate(new Consumer<Emitter<Object>>() {
    @Override
    public void accept(Emitter<Object> e) throws Exception {
      e.onNext(1);
      e.onNext(2);
    }
  })
  .test(1)
  .assertFailure(IllegalStateException.class, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(Flowable.generate(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return 1;
      }
    }, new BiConsumer<Object, Emitter<Object>>() {
      @Override
      public void accept(Object s, Emitter<Object> e) throws Exception {
        e.onComplete();
      }
    }, Functions.emptyConsumer()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void multipleOnComplete() {
    Flowable.generate(new Consumer<Emitter<Object>>() {
      @Override
      public void accept(Emitter<Object> e) throws Exception {
        e.onComplete();
        e.onComplete();
      }
    })
    .test(1)
    .assertResult();
  }
}

代码示例来源:origin: ReactiveX/RxJava

public static Flowable<Event> getEventStream(final String type, final int numInstances) {
  return Flowable.<Event>generate(new EventConsumer(type, numInstances))
      .subscribeOn(Schedulers.newThread());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.generate(new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return 1;
      }
    }, new BiConsumer<Object, Emitter<Object>>() {
      @Override
      public void accept(Object s, Emitter<Object> e) throws Exception {
        e.onComplete();
      }
    }, Functions.emptyConsumer()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void stateSupplierThrows() {
  Flowable.generate(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      throw new TestException();
    }
  }, new BiConsumer<Object, Emitter<Object>>() {
    @Override
    public void accept(Object s, Emitter<Object> e) throws Exception {
      e.onNext(s);
    }
  }, Functions.emptyConsumer())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void generatorThrows() {
  Flowable.generate(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return 1;
    }
  }, new BiConsumer<Object, Emitter<Object>>() {
    @Override
    public void accept(Object s, Emitter<Object> e) throws Exception {
      throw new TestException();
    }
  }, Functions.emptyConsumer())
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nullError() {
  final int[] call = { 0 };
  Flowable.generate(Functions.justCallable(1),
  new BiConsumer<Integer, Emitter<Object>>() {
    @Override
    public void accept(Integer s, Emitter<Object> e) throws Exception {
      try {
        e.onError(null);
      } catch (NullPointerException ex) {
        call[0]++;
      }
    }
  }, Functions.emptyConsumer())
  .test()
  .assertFailure(NullPointerException.class);
  assertEquals(0, call[0]);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rebatchAndTake() {
  Flowable.generate(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      return 1;
    }
  }, new BiConsumer<Object, Emitter<Object>>() {
    @Override
    public void accept(Object s, Emitter<Object> e) throws Exception {
      e.onNext(1);
    }
  }, Functions.emptyConsumer())
  .rebatchRequests(1)
  .take(5)
  .test()
  .assertResult(1, 1, 1, 1, 1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void multipleOnError() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.generate(new Consumer<Emitter<Object>>() {
      @Override
      public void accept(Emitter<Object> e) throws Exception {
        e.onError(new TestException("First"));
        e.onError(new TestException("Second"));
      }
    })
    .test(1)
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
  } finally {
    RxJavaPlugins.reset();
  }
}

相关文章

Flowable类方法