[英]Returns a Single that emits the single item emitted by this Flowable, if this Flowable emits only a single item, otherwise if this Flowable completes without emitting any items a NoSuchElementException will be signaled and if this Flowable emits more than one item, an IllegalArgumentException will be signaled.
Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure). Scheduler: singleOrError does not operate by default on a particular Scheduler.
代码示例来源:origin: ReactiveX/RxJava
public SingleSource<Object> apply(Flowable<Object> f) throws Exception {
return f.singleOrError();
代码示例来源:origin: ReactiveX/RxJava
public Object apply(Flowable<Object> f) throws Exception {
return f.singleOrError();
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.singleOrError().toFlowable();
代码示例来源:origin: ReactiveX/RxJava
public Object apply(Flowable<Object> f) throws Exception {
return f.singleOrError().toFlowable();
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
* If this {@code Flowable} completes after emitting a single item, return that item, otherwise
* throw a {@code NoSuchElementException}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
* @return the single item emitted by this {@code Flowable}
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
public final T blockingSingle() {
return singleOrError().blockingGet();
代码示例来源:origin: redisson/redisson
* If this {@code Flowable} completes after emitting a single item, return that item, otherwise
* throw a {@code NoSuchElementException}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
* @return the single item emitted by this {@code Flowable}
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
public final T blockingSingle() {
return singleOrError().blockingGet();
代码示例来源:origin: ReactiveX/RxJava
public void singleOrErrorError() {
Flowable.error(new RuntimeException("error"))
代码示例来源:origin: ReactiveX/RxJava
public void singleOrErrorNoElement() {
代码示例来源:origin: ReactiveX/RxJava
public void singleOrError() {
代码示例来源:origin: ReactiveX/RxJava
public void singleOrErrorMultipleElements() {
Flowable.just(1, 2, 3)
代码示例来源:origin: ReactiveX/RxJava
public void singleOrErrorOneElement() {
代码示例来源:origin: akarnokd/RxJava2Jdk8Interop
* Returns a CompletionStage that signals the single element of the Flowable,
* IllegalArgumentException if the Flowable is longer than 1 element
* or a NoSuchElementException if the Flowable is empty.
* @param <T> the value type
* @return the Function to be used with {@code Flowable.to}.
public static <T> Function<Flowable<T>, CompletionStage<T>> single() {
return f -> {
CompletableFuture<T> cf = new CompletableFuture<>();
f.singleOrError().subscribe(cf::complete, cf::completeExceptionally);
return cf;
代码示例来源:origin: mqtt-bee/mqtt-bee
public @NotNull CompletableFuture<@NotNull Mqtt5PublishResult> publish(final @Nullable Mqtt5Publish publish) {
Checks.notNull(publish, "Publish");
return RxFutureConverter.toFuture(delegate.publishHalfSafe(Flowable.just(publish)).singleOrError())
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
.map(row -> new JsonObject().put("id", albumId).put("title", row.getString(0)))
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findGenre(SQLConnection sqlConnection, Long genreId) {
return sqlConnection.rxQueryStreamWithParams(findGenreById, new JsonArray().add(genreId))
.map(row -> new JsonObject().put("id", genreId).put("name", row.getString(0)))
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
.map(row -> new JsonObject()
.put("id", albumId)
.put("title", row.getString(0))
.put("mbAlbumId", row.getString(1)))
代码示例来源:origin: uk.os.vt/vt-mbtiles
private static synchronized Single<HashMap<String, String>> queryMetadata(Database dataSource) {
final URL url = Resources.getResource("metadata_key_value.sql");
String query;
try {
query = Resources.toString(url, Charsets.UTF_8);
} catch (final IOException ex) {
return Single.error(ex);
return dataSource.select(query).get(new ResultSetMapper<HashMap<String, String>>() {
public HashMap<String, String> apply(@Nonnull ResultSet rs) throws SQLException {
final HashMap<String, String> metadata = new LinkedHashMap<>();
while (rs.getRow() != 0) {
metadata.put(rs.getString("name"), rs.getString("value"));
return metadata;
代码示例来源:origin: mqtt-bee/mqtt-bee
public @NotNull Mqtt5PublishResult publish(final @Nullable Mqtt5Publish publish) {
Checks.notNull(publish, "Publish");
try {
return handlePublish(delegate.publishUnsafe(Flowable.just(publish)).singleOrError().blockingGet());
} catch (final RuntimeException e) {
throw AsyncRuntimeException.fillInStackTrace(e);
代码示例来源:origin: com.blackducksoftware.bdio/bdio-tinkerpop
public void readGraph(InputStream inputStream, String base, Object expandContext, List<TraversalStrategy<?>> strategies, Graph graph) throws IOException {
// Create a new BDIO document using a new document context derived from the graph context
RxJavaBdioDocument doc = new RxJavaBdioDocument(frame.context().newBuilder().base(base).expandContext(expandContext).build());
// The reader SPI allows for graph implementation specific optimizations
GraphTraversalSource g = graph.traversal().withStrategies(strategies.toArray(new TraversalStrategy<?>[strategies.size()]));
BlackDuckIoReaderSpi spi = BlackDuckIoSpi.getForGraph(graph).reader(g, options, frame, batchSize);
try {
// Read the input stream as sequence of "entries" (individual JSON-LD documents)
Flowable<Object> entries = doc.read(inputStream);
// If we are persisting metadata, create a separate subscription just for that
if (options.metadataLabel().isPresent()) {
entries = entries.publish().autoConnect(2);
doc.metadata(entries).singleOrError().subscribe(spi::persistMetadata, RxJavaPlugins::onError);
// Frame the entries and do a blocking persist
} catch (RuntimeException e) {
Throwable failure = unwrap(e);
throwIfInstanceOf(failure, IOException.class);
if (failure instanceof NodeDoesNotExistException) {
throw new BlackDuckIoReadGraphException("Failed to load BDIO due to invalid references in the input", failure);
} else if (failure instanceof SQLException) {
throw new BlackDuckIoReadGraphException("Failed to load BDIO due to a database error", failure);
// Add a check above and throw a BlackDuckIoReadGraphException with a nice message instead
throw new IllegalStateException("Unexpected checked exception in readGraph", failure);