io.reactivex.Flowable.singleOrError()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(193)

本文整理了Java中io.reactivex.Flowable.singleOrError()方法的一些代码示例,展示了Flowable.singleOrError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.singleOrError()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:singleOrError

Flowable.singleOrError介绍

[英]Returns a Single that emits the single item emitted by this Flowable, if this Flowable emits only a single item, otherwise if this Flowable completes without emitting any items a NoSuchElementException will be signaled and if this Flowable emits more than one item, an IllegalArgumentException will be signaled.

Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure). Scheduler: singleOrError does not operate by default on a particular Scheduler.
[中]返回发出此可流文件发出的单个项的单个项,如果此可流文件仅发出单个项,否则,如果此可流文件在未发出任何项的情况下完成,则将发出NosTouchElementException信号,如果此可流文件发出多个项,则将发出IllegalArgumentException信号。
背压:操作员接受来自下游的背压,并以无限制的方式(即不施加背压)消耗源发布服务器。调度程序:默认情况下,singleOrError不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public SingleSource<Object> apply(Flowable<Object> f) throws Exception {
    return f.singleOrError();
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Object> f) throws Exception {
    return f.singleOrError();
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> f) throws Exception {
    return f.singleOrError().toFlowable();
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Flowable<Object> f) throws Exception {
    return f.singleOrError().toFlowable();
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

/**
 * If this {@code Flowable} completes after emitting a single item, return that item, otherwise
 * throw a {@code NoSuchElementException}.
 * <p>
 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator consumes the source {@code Flowable} in an unbounded manner
 *  (i.e., no backpressure applied to it).</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
 *  <dt><b>Error handling:</b></dt>
 *  <dd>If the source signals an error, the operator wraps a checked {@link Exception}
 *  into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
 *  {@link Error}s are rethrown as they are.</dd>
 * </dl>
 *
 * @return the single item emitted by this {@code Flowable}
 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingSingle() {
  return singleOrError().blockingGet();
}

代码示例来源:origin: redisson/redisson

/**
 * If this {@code Flowable} completes after emitting a single item, return that item, otherwise
 * throw a {@code NoSuchElementException}.
 * <p>
 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator consumes the source {@code Flowable} in an unbounded manner
 *  (i.e., no backpressure applied to it).</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
 *  <dt><b>Error handling:</b></dt>
 *  <dd>If the source signals an error, the operator wraps a checked {@link Exception}
 *  into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
 *  {@link Error}s are rethrown as they are.</dd>
 * </dl>
 *
 * @return the single item emitted by this {@code Flowable}
 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingSingle() {
  return singleOrError().blockingGet();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void singleOrErrorError() {
  Flowable.error(new RuntimeException("error"))
    .singleOrError()
    .test()
    .assertNoValues()
    .assertErrorMessage("error")
    .assertError(RuntimeException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void singleOrErrorNoElement() {
  Flowable.empty()
    .singleOrError()
    .test()
    .assertNoValues()
    .assertError(NoSuchElementException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void singleOrError() {
    Flowable.empty()
    .singleOrError()
    .toFlowable()
    .test()
    .assertFailure(NoSuchElementException.class);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void singleOrErrorMultipleElements() {
  Flowable.just(1, 2, 3)
    .singleOrError()
    .test()
    .assertNoValues()
    .assertError(IllegalArgumentException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void singleOrErrorOneElement() {
  Flowable.just(1)
    .singleOrError()
    .test()
    .assertNoErrors()
    .assertValue(1);
}

代码示例来源:origin: akarnokd/RxJava2Jdk8Interop

/**
 * Returns a CompletionStage that signals the single element of the Flowable,
 * IllegalArgumentException if the Flowable is longer than 1 element
 * or a NoSuchElementException if the Flowable is empty.
 * @param <T> the value type
 * @return the Function to be used with {@code Flowable.to}.
 */
public static <T> Function<Flowable<T>, CompletionStage<T>> single() {
  return f -> {
    CompletableFuture<T> cf = new CompletableFuture<>();
    f.singleOrError().subscribe(cf::complete, cf::completeExceptionally);
    return cf;
  };
}

代码示例来源:origin: mqtt-bee/mqtt-bee

@Override
public @NotNull CompletableFuture<@NotNull Mqtt5PublishResult> publish(final @Nullable Mqtt5Publish publish) {
  Checks.notNull(publish, "Publish");
  return RxFutureConverter.toFuture(delegate.publishHalfSafe(Flowable.just(publish)).singleOrError())
      .thenApply(PUBLISH_HANDLER);
}

代码示例来源:origin: tsegismont/vertx-musicstore

private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
 return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
  .flatMapPublisher(SQLRowStream::toFlowable)
  .map(row -> new JsonObject().put("id", albumId).put("title", row.getString(0)))
  .singleOrError();
}

代码示例来源:origin: tsegismont/vertx-musicstore

private Single<JsonObject> findGenre(SQLConnection sqlConnection, Long genreId) {
 return sqlConnection.rxQueryStreamWithParams(findGenreById, new JsonArray().add(genreId))
  .flatMapPublisher(SQLRowStream::toFlowable)
  .map(row -> new JsonObject().put("id", genreId).put("name", row.getString(0)))
  .singleOrError();
}

代码示例来源:origin: tsegismont/vertx-musicstore

private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
  return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
   .flatMapPublisher(SQLRowStream::toFlowable)
   .map(row -> new JsonObject()
    .put("id", albumId)
    .put("title", row.getString(0))
    .put("mbAlbumId", row.getString(1)))
   .singleOrError();
 }
}

代码示例来源:origin: uk.os.vt/vt-mbtiles

private static synchronized Single<HashMap<String, String>> queryMetadata(Database dataSource) {
 final URL url = Resources.getResource("metadata_key_value.sql");
 String query;
 try {
  query = Resources.toString(url, Charsets.UTF_8);
 } catch (final IOException ex) {
  return Single.error(ex);
 }
 return dataSource.select(query).get(new ResultSetMapper<HashMap<String, String>>() {
  @Override
  public HashMap<String, String> apply(@Nonnull ResultSet rs) throws SQLException {
   final HashMap<String, String> metadata = new LinkedHashMap<>();
   while (rs.getRow() != 0) {
    metadata.put(rs.getString("name"), rs.getString("value"));
    rs.next();
   }
   return metadata;
  }
 }).singleOrError();
}

代码示例来源:origin: mqtt-bee/mqtt-bee

@Override
public @NotNull Mqtt5PublishResult publish(final @Nullable Mqtt5Publish publish) {
  Checks.notNull(publish, "Publish");
  try {
    return handlePublish(delegate.publishUnsafe(Flowable.just(publish)).singleOrError().blockingGet());
  } catch (final RuntimeException e) {
    throw AsyncRuntimeException.fillInStackTrace(e);
  }
}

代码示例来源:origin: com.blackducksoftware.bdio/bdio-tinkerpop

@SuppressWarnings("CheckReturnValue")
public void readGraph(InputStream inputStream, String base, Object expandContext, List<TraversalStrategy<?>> strategies, Graph graph) throws IOException {
  // Create a new BDIO document using a new document context derived from the graph context
  RxJavaBdioDocument doc = new RxJavaBdioDocument(frame.context().newBuilder().base(base).expandContext(expandContext).build());
  // The reader SPI allows for graph implementation specific optimizations
  GraphTraversalSource g = graph.traversal().withStrategies(strategies.toArray(new TraversalStrategy<?>[strategies.size()]));
  BlackDuckIoReaderSpi spi = BlackDuckIoSpi.getForGraph(graph).reader(g, options, frame, batchSize);
  try {
    // Read the input stream as sequence of "entries" (individual JSON-LD documents)
    Flowable<Object> entries = doc.read(inputStream);
    // If we are persisting metadata, create a separate subscription just for that
    if (options.metadataLabel().isPresent()) {
      entries = entries.publish().autoConnect(2);
      doc.metadata(entries).singleOrError().subscribe(spi::persistMetadata, RxJavaPlugins::onError);
    }
    // Frame the entries and do a blocking persist
    doc.jsonLd(entries).frame(frame.serialize()).compose(spi::persistFramedEntries).blockingSubscribe();
  } catch (RuntimeException e) {
    Throwable failure = unwrap(e);
    throwIfInstanceOf(failure, IOException.class);
    throwIfUnchecked(failure);
    if (failure instanceof NodeDoesNotExistException) {
      throw new BlackDuckIoReadGraphException("Failed to load BDIO due to invalid references in the input", failure);
    } else if (failure instanceof SQLException) {
      throw new BlackDuckIoReadGraphException("Failed to load BDIO due to a database error", failure);
    }
    // Add a check above and throw a BlackDuckIoReadGraphException with a nice message instead
    throw new IllegalStateException("Unexpected checked exception in readGraph", failure);
  }
}

相关文章

Flowable类方法