io.reactivex.Flowable.singleOrError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(223)

本文整理了Java中io.reactivex.Flowable.singleOrError()方法的一些代码示例,展示了Flowable.singleOrError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.singleOrError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:singleOrError

Flowable.singleOrError介绍

[英]Returns a Single that emits the single item emitted by this Flowable, if this Flowable emits only a single item, otherwise if this Flowable completes without emitting any items a NoSuchElementException will be signaled and if this Flowable emits more than one item, an IllegalArgumentException will be signaled.

Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure). Scheduler: singleOrError does not operate by default on a particular Scheduler.
[中]返回发出此可流文件发出的单个项的单个项,如果此可流文件仅发出单个项,否则,如果此可流文件在未发出任何项的情况下完成,则将发出NosTouchElementException信号,如果此可流文件发出多个项,则将发出IllegalArgumentException信号。
背压:操作员接受来自下游的背压,并以无限制的方式(即不施加背压)消耗源发布服务器。调度程序:默认情况下,singleOrError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public SingleSource<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.singleOrError();
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Object> f) throws Exception {
  3. return f.singleOrError();
  4. }
  5. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Flowable<Object> apply(Flowable<Object> f) throws Exception {
  3. return f.singleOrError().toFlowable();
  4. }
  5. });

代码示例来源:origin: ReactiveX/RxJava

  1. @Override
  2. public Object apply(Flowable<Object> f) throws Exception {
  3. return f.singleOrError().toFlowable();
  4. }
  5. }, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * If this {@code Flowable} completes after emitting a single item, return that item, otherwise
  3. * throw a {@code NoSuchElementException}.
  4. * <p>
  5. * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
  6. * <dl>
  7. * <dt><b>Backpressure:</b></dt>
  8. * <dd>The operator consumes the source {@code Flowable} in an unbounded manner
  9. * (i.e., no backpressure applied to it).</dd>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
  12. * <dt><b>Error handling:</b></dt>
  13. * <dd>If the source signals an error, the operator wraps a checked {@link Exception}
  14. * into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
  15. * {@link Error}s are rethrown as they are.</dd>
  16. * </dl>
  17. *
  18. * @return the single item emitted by this {@code Flowable}
  19. * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
  20. */
  21. @CheckReturnValue
  22. @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
  23. @SchedulerSupport(SchedulerSupport.NONE)
  24. public final T blockingSingle() {
  25. return singleOrError().blockingGet();
  26. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * If this {@code Flowable} completes after emitting a single item, return that item, otherwise
  3. * throw a {@code NoSuchElementException}.
  4. * <p>
  5. * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
  6. * <dl>
  7. * <dt><b>Backpressure:</b></dt>
  8. * <dd>The operator consumes the source {@code Flowable} in an unbounded manner
  9. * (i.e., no backpressure applied to it).</dd>
  10. * <dt><b>Scheduler:</b></dt>
  11. * <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
  12. * <dt><b>Error handling:</b></dt>
  13. * <dd>If the source signals an error, the operator wraps a checked {@link Exception}
  14. * into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
  15. * {@link Error}s are rethrown as they are.</dd>
  16. * </dl>
  17. *
  18. * @return the single item emitted by this {@code Flowable}
  19. * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
  20. */
  21. @CheckReturnValue
  22. @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
  23. @SchedulerSupport(SchedulerSupport.NONE)
  24. public final T blockingSingle() {
  25. return singleOrError().blockingGet();
  26. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void singleOrErrorError() {
  3. Flowable.error(new RuntimeException("error"))
  4. .singleOrError()
  5. .test()
  6. .assertNoValues()
  7. .assertErrorMessage("error")
  8. .assertError(RuntimeException.class);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void singleOrErrorNoElement() {
  3. Flowable.empty()
  4. .singleOrError()
  5. .test()
  6. .assertNoValues()
  7. .assertError(NoSuchElementException.class);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void singleOrError() {
  3. Flowable.empty()
  4. .singleOrError()
  5. .toFlowable()
  6. .test()
  7. .assertFailure(NoSuchElementException.class);
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void singleOrErrorMultipleElements() {
  3. Flowable.just(1, 2, 3)
  4. .singleOrError()
  5. .test()
  6. .assertNoValues()
  7. .assertError(IllegalArgumentException.class);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void singleOrErrorOneElement() {
  3. Flowable.just(1)
  4. .singleOrError()
  5. .test()
  6. .assertNoErrors()
  7. .assertValue(1);
  8. }

代码示例来源:origin: akarnokd/RxJava2Jdk8Interop

  1. /**
  2. * Returns a CompletionStage that signals the single element of the Flowable,
  3. * IllegalArgumentException if the Flowable is longer than 1 element
  4. * or a NoSuchElementException if the Flowable is empty.
  5. * @param <T> the value type
  6. * @return the Function to be used with {@code Flowable.to}.
  7. */
  8. public static <T> Function<Flowable<T>, CompletionStage<T>> single() {
  9. return f -> {
  10. CompletableFuture<T> cf = new CompletableFuture<>();
  11. f.singleOrError().subscribe(cf::complete, cf::completeExceptionally);
  12. return cf;
  13. };
  14. }

代码示例来源:origin: mqtt-bee/mqtt-bee

  1. @Override
  2. public @NotNull CompletableFuture<@NotNull Mqtt5PublishResult> publish(final @Nullable Mqtt5Publish publish) {
  3. Checks.notNull(publish, "Publish");
  4. return RxFutureConverter.toFuture(delegate.publishHalfSafe(Flowable.just(publish)).singleOrError())
  5. .thenApply(PUBLISH_HANDLER);
  6. }

代码示例来源:origin: tsegismont/vertx-musicstore

  1. private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
  2. return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
  3. .flatMapPublisher(SQLRowStream::toFlowable)
  4. .map(row -> new JsonObject().put("id", albumId).put("title", row.getString(0)))
  5. .singleOrError();
  6. }

代码示例来源:origin: tsegismont/vertx-musicstore

  1. private Single<JsonObject> findGenre(SQLConnection sqlConnection, Long genreId) {
  2. return sqlConnection.rxQueryStreamWithParams(findGenreById, new JsonArray().add(genreId))
  3. .flatMapPublisher(SQLRowStream::toFlowable)
  4. .map(row -> new JsonObject().put("id", genreId).put("name", row.getString(0)))
  5. .singleOrError();
  6. }

代码示例来源:origin: tsegismont/vertx-musicstore

  1. private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
  2. return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
  3. .flatMapPublisher(SQLRowStream::toFlowable)
  4. .map(row -> new JsonObject()
  5. .put("id", albumId)
  6. .put("title", row.getString(0))
  7. .put("mbAlbumId", row.getString(1)))
  8. .singleOrError();
  9. }
  10. }

代码示例来源:origin: uk.os.vt/vt-mbtiles

  1. private static synchronized Single<HashMap<String, String>> queryMetadata(Database dataSource) {
  2. final URL url = Resources.getResource("metadata_key_value.sql");
  3. String query;
  4. try {
  5. query = Resources.toString(url, Charsets.UTF_8);
  6. } catch (final IOException ex) {
  7. return Single.error(ex);
  8. }
  9. return dataSource.select(query).get(new ResultSetMapper<HashMap<String, String>>() {
  10. @Override
  11. public HashMap<String, String> apply(@Nonnull ResultSet rs) throws SQLException {
  12. final HashMap<String, String> metadata = new LinkedHashMap<>();
  13. while (rs.getRow() != 0) {
  14. metadata.put(rs.getString("name"), rs.getString("value"));
  15. rs.next();
  16. }
  17. return metadata;
  18. }
  19. }).singleOrError();
  20. }

代码示例来源:origin: mqtt-bee/mqtt-bee

  1. @Override
  2. public @NotNull Mqtt5PublishResult publish(final @Nullable Mqtt5Publish publish) {
  3. Checks.notNull(publish, "Publish");
  4. try {
  5. return handlePublish(delegate.publishUnsafe(Flowable.just(publish)).singleOrError().blockingGet());
  6. } catch (final RuntimeException e) {
  7. throw AsyncRuntimeException.fillInStackTrace(e);
  8. }
  9. }

代码示例来源:origin: com.blackducksoftware.bdio/bdio-tinkerpop

  1. @SuppressWarnings("CheckReturnValue")
  2. public void readGraph(InputStream inputStream, String base, Object expandContext, List<TraversalStrategy<?>> strategies, Graph graph) throws IOException {
  3. // Create a new BDIO document using a new document context derived from the graph context
  4. RxJavaBdioDocument doc = new RxJavaBdioDocument(frame.context().newBuilder().base(base).expandContext(expandContext).build());
  5. // The reader SPI allows for graph implementation specific optimizations
  6. GraphTraversalSource g = graph.traversal().withStrategies(strategies.toArray(new TraversalStrategy<?>[strategies.size()]));
  7. BlackDuckIoReaderSpi spi = BlackDuckIoSpi.getForGraph(graph).reader(g, options, frame, batchSize);
  8. try {
  9. // Read the input stream as sequence of "entries" (individual JSON-LD documents)
  10. Flowable<Object> entries = doc.read(inputStream);
  11. // If we are persisting metadata, create a separate subscription just for that
  12. if (options.metadataLabel().isPresent()) {
  13. entries = entries.publish().autoConnect(2);
  14. doc.metadata(entries).singleOrError().subscribe(spi::persistMetadata, RxJavaPlugins::onError);
  15. }
  16. // Frame the entries and do a blocking persist
  17. doc.jsonLd(entries).frame(frame.serialize()).compose(spi::persistFramedEntries).blockingSubscribe();
  18. } catch (RuntimeException e) {
  19. Throwable failure = unwrap(e);
  20. throwIfInstanceOf(failure, IOException.class);
  21. throwIfUnchecked(failure);
  22. if (failure instanceof NodeDoesNotExistException) {
  23. throw new BlackDuckIoReadGraphException("Failed to load BDIO due to invalid references in the input", failure);
  24. } else if (failure instanceof SQLException) {
  25. throw new BlackDuckIoReadGraphException("Failed to load BDIO due to a database error", failure);
  26. }
  27. // Add a check above and throw a BlackDuckIoReadGraphException with a nice message instead
  28. throw new IllegalStateException("Unexpected checked exception in readGraph", failure);
  29. }
  30. }

相关文章

Flowable类方法