本文整理了Java中io.reactivex.Flowable.singleOrError()
方法的一些代码示例,展示了Flowable.singleOrError()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.singleOrError()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:singleOrError
[英]Returns a Single that emits the single item emitted by this Flowable, if this Flowable emits only a single item, otherwise if this Flowable completes without emitting any items a NoSuchElementException will be signaled and if this Flowable emits more than one item, an IllegalArgumentException will be signaled.
Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure). Scheduler: singleOrError does not operate by default on a particular Scheduler.
[中]返回发出此可流文件发出的单个项的单个项,如果此可流文件仅发出单个项,否则,如果此可流文件在未发出任何项的情况下完成,则将发出NosTouchElementException信号,如果此可流文件发出多个项,则将发出IllegalArgumentException信号。
背压:操作员接受来自下游的背压,并以无限制的方式(即不施加背压)消耗源发布服务器。调度程序:默认情况下,singleOrError不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public SingleSource<Object> apply(Flowable<Object> f) throws Exception {
return f.singleOrError();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Object> f) throws Exception {
return f.singleOrError();
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<Object> apply(Flowable<Object> f) throws Exception {
return f.singleOrError().toFlowable();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Flowable<Object> f) throws Exception {
return f.singleOrError().toFlowable();
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
/**
* If this {@code Flowable} completes after emitting a single item, return that item, otherwise
* throw a {@code NoSuchElementException}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
*
* @return the single item emitted by this {@code Flowable}
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingSingle() {
return singleOrError().blockingGet();
}
代码示例来源:origin: redisson/redisson
/**
* If this {@code Flowable} completes after emitting a single item, return that item, otherwise
* throw a {@code NoSuchElementException}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
*
* @return the single item emitted by this {@code Flowable}
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingSingle() {
return singleOrError().blockingGet();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void singleOrErrorError() {
Flowable.error(new RuntimeException("error"))
.singleOrError()
.test()
.assertNoValues()
.assertErrorMessage("error")
.assertError(RuntimeException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void singleOrErrorNoElement() {
Flowable.empty()
.singleOrError()
.test()
.assertNoValues()
.assertError(NoSuchElementException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void singleOrError() {
Flowable.empty()
.singleOrError()
.toFlowable()
.test()
.assertFailure(NoSuchElementException.class);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void singleOrErrorMultipleElements() {
Flowable.just(1, 2, 3)
.singleOrError()
.test()
.assertNoValues()
.assertError(IllegalArgumentException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void singleOrErrorOneElement() {
Flowable.just(1)
.singleOrError()
.test()
.assertNoErrors()
.assertValue(1);
}
代码示例来源:origin: akarnokd/RxJava2Jdk8Interop
/**
* Returns a CompletionStage that signals the single element of the Flowable,
* IllegalArgumentException if the Flowable is longer than 1 element
* or a NoSuchElementException if the Flowable is empty.
* @param <T> the value type
* @return the Function to be used with {@code Flowable.to}.
*/
public static <T> Function<Flowable<T>, CompletionStage<T>> single() {
return f -> {
CompletableFuture<T> cf = new CompletableFuture<>();
f.singleOrError().subscribe(cf::complete, cf::completeExceptionally);
return cf;
};
}
代码示例来源:origin: mqtt-bee/mqtt-bee
@Override
public @NotNull CompletableFuture<@NotNull Mqtt5PublishResult> publish(final @Nullable Mqtt5Publish publish) {
Checks.notNull(publish, "Publish");
return RxFutureConverter.toFuture(delegate.publishHalfSafe(Flowable.just(publish)).singleOrError())
.thenApply(PUBLISH_HANDLER);
}
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
.flatMapPublisher(SQLRowStream::toFlowable)
.map(row -> new JsonObject().put("id", albumId).put("title", row.getString(0)))
.singleOrError();
}
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findGenre(SQLConnection sqlConnection, Long genreId) {
return sqlConnection.rxQueryStreamWithParams(findGenreById, new JsonArray().add(genreId))
.flatMapPublisher(SQLRowStream::toFlowable)
.map(row -> new JsonObject().put("id", genreId).put("name", row.getString(0)))
.singleOrError();
}
代码示例来源:origin: tsegismont/vertx-musicstore
private Single<JsonObject> findAlbum(SQLConnection sqlConnection, Long albumId) {
return sqlConnection.rxQueryStreamWithParams(findAlbumById, new JsonArray().add(albumId))
.flatMapPublisher(SQLRowStream::toFlowable)
.map(row -> new JsonObject()
.put("id", albumId)
.put("title", row.getString(0))
.put("mbAlbumId", row.getString(1)))
.singleOrError();
}
}
代码示例来源:origin: uk.os.vt/vt-mbtiles
private static synchronized Single<HashMap<String, String>> queryMetadata(Database dataSource) {
final URL url = Resources.getResource("metadata_key_value.sql");
String query;
try {
query = Resources.toString(url, Charsets.UTF_8);
} catch (final IOException ex) {
return Single.error(ex);
}
return dataSource.select(query).get(new ResultSetMapper<HashMap<String, String>>() {
@Override
public HashMap<String, String> apply(@Nonnull ResultSet rs) throws SQLException {
final HashMap<String, String> metadata = new LinkedHashMap<>();
while (rs.getRow() != 0) {
metadata.put(rs.getString("name"), rs.getString("value"));
rs.next();
}
return metadata;
}
}).singleOrError();
}
代码示例来源:origin: mqtt-bee/mqtt-bee
@Override
public @NotNull Mqtt5PublishResult publish(final @Nullable Mqtt5Publish publish) {
Checks.notNull(publish, "Publish");
try {
return handlePublish(delegate.publishUnsafe(Flowable.just(publish)).singleOrError().blockingGet());
} catch (final RuntimeException e) {
throw AsyncRuntimeException.fillInStackTrace(e);
}
}
代码示例来源:origin: com.blackducksoftware.bdio/bdio-tinkerpop
@SuppressWarnings("CheckReturnValue")
public void readGraph(InputStream inputStream, String base, Object expandContext, List<TraversalStrategy<?>> strategies, Graph graph) throws IOException {
// Create a new BDIO document using a new document context derived from the graph context
RxJavaBdioDocument doc = new RxJavaBdioDocument(frame.context().newBuilder().base(base).expandContext(expandContext).build());
// The reader SPI allows for graph implementation specific optimizations
GraphTraversalSource g = graph.traversal().withStrategies(strategies.toArray(new TraversalStrategy<?>[strategies.size()]));
BlackDuckIoReaderSpi spi = BlackDuckIoSpi.getForGraph(graph).reader(g, options, frame, batchSize);
try {
// Read the input stream as sequence of "entries" (individual JSON-LD documents)
Flowable<Object> entries = doc.read(inputStream);
// If we are persisting metadata, create a separate subscription just for that
if (options.metadataLabel().isPresent()) {
entries = entries.publish().autoConnect(2);
doc.metadata(entries).singleOrError().subscribe(spi::persistMetadata, RxJavaPlugins::onError);
}
// Frame the entries and do a blocking persist
doc.jsonLd(entries).frame(frame.serialize()).compose(spi::persistFramedEntries).blockingSubscribe();
} catch (RuntimeException e) {
Throwable failure = unwrap(e);
throwIfInstanceOf(failure, IOException.class);
throwIfUnchecked(failure);
if (failure instanceof NodeDoesNotExistException) {
throw new BlackDuckIoReadGraphException("Failed to load BDIO due to invalid references in the input", failure);
} else if (failure instanceof SQLException) {
throw new BlackDuckIoReadGraphException("Failed to load BDIO due to a database error", failure);
}
// Add a check above and throw a BlackDuckIoReadGraphException with a nice message instead
throw new IllegalStateException("Unexpected checked exception in readGraph", failure);
}
}
内容来源于网络,如有侵权,请联系作者删除!