[英]Returns a Single that emits only the very first item emitted by this Flowable or signals a NoSuchElementException if this Flowable is empty.
Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., without applying backpressure). Scheduler: firstOrError does not operate by default on a particular Scheduler.
代码示例来源:origin: ReactiveX/RxJava
public void noReentrantDispose() {
final AtomicInteger cancelCalled = new AtomicInteger();
final BehaviorProcessor<Integer> p = BehaviorProcessor.create();
p.doOnCancel(new Action() {
public void run() throws Exception {
assertEquals(1, cancelCalled.get());
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorError() {
Flowable.error(new RuntimeException("error"))
代码示例来源:origin: apache/incubator-gobblin
public void testBase(InstrumentedExtractorBase<String, String> extractor)
throws DataRecordException, IOException {
RecordStreamWithMetadata<String, String> stream = extractor.recordStream(new AtomicBoolean(false));
RecordEnvelope<String> r = (RecordEnvelope<String>) stream.getRecordStream().firstOrError().blockingGet();
Map<String, Long> metrics = MetricsHelper.dumpMetrics(extractor.getMetricContext());
Assert.assertEquals(metrics.get(MetricNames.ExtractorMetrics.RECORDS_READ_METER), Long.valueOf(1));
Assert.assertEquals(metrics.get(MetricNames.ExtractorMetrics.RECORDS_FAILED_METER), Long.valueOf(0));
Assert.assertEquals(metrics.get(MetricNames.ExtractorMetrics.EXTRACT_TIMER), Long.valueOf(1));
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorMultipleElements() {
Flowable.just(1, 2, 3)
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorErrorFlowable() {
Flowable.error(new RuntimeException("error"))
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorNoElement() {
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorOneElement() {
代码示例来源:origin: TeamNewPipe/NewPipe
public Completable updateChannelInfo(final ChannelInfo info) {
final Function<List<SubscriptionEntity>, CompletableSource> update = new Function<List<SubscriptionEntity>, CompletableSource>() {
public CompletableSource apply(@NonNull List<SubscriptionEntity> subscriptionEntities) {
if (DEBUG) Log.d(TAG, "updateChannelInfo() called with: subscriptionEntities = [" + subscriptionEntities + "]");
if (subscriptionEntities.size() == 1) {
SubscriptionEntity subscription = subscriptionEntities.get(0);
// Subscriber count changes very often, making this check almost unnecessary.
// Consider removing it later.
if (!isSubscriptionUpToDate(info, subscription)) {
subscription.setData(info.getName(), info.getAvatarUrl(), info.getDescription(), info.getSubscriberCount());
return Completable.fromRunnable(() -> subscriptionTable().update(subscription));
return Completable.complete();
return subscriptionTable().getSubscription(info.getServiceId(), info.getUrl())
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorNoElementFlowable() {
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorMultipleElementsFlowable() {
Flowable.just(1, 2, 3)
代码示例来源:origin: ReactiveX/RxJava
public void firstOrErrorOneElementFlowable() {
代码示例来源:origin: Polidea/RxAndroidBle
代码示例来源:origin: akarnokd/RxJava2Jdk8Interop
* Returns a CompletionStage that signals the first element of the Flowable
* or a NoSuchElementException if the Flowable is empty.
* @param <T> the value type
* @return the Function to be used via {@code Flowable.to}.
public static <T> Function<Flowable<T>, CompletionStage<T>> first() {
return f -> {
CompletableFuture<T> cf = new CompletableFuture<>();
f.firstOrError().subscribe(cf::complete, cf::completeExceptionally);
return cf;
代码示例来源:origin: io.smallrye.reactive/smallrye-converter-rxjava2
public <X> Single fromPublisher(Publisher<X> publisher) {
return Flowable.fromPublisher(publisher).firstOrError();
代码示例来源:origin: instacart/truetime-android
* Initialize TrueTime
* Use this if you want to resolve the NTP Pool address to individual IPs yourself
* See https://github.com/instacart/truetime-android/issues/42
* to understand why you may want to do something like this.
* @param resolvedNtpAddresses list of resolved IP addresses for an NTP
* @return Observable of detailed long[] containing most important parts of the actual NTP response
* See RESPONSE_INDEX_ prefixes in {@link SntpClient} for details
public Single<long[]> initializeNtp(List<InetAddress> resolvedNtpAddresses) {
return Flowable.fromIterable(resolvedNtpAddresses)
代码示例来源:origin: instacart/truetime-android
* Initialize TrueTime
* A single NTP pool server is provided.
* Using DNS we resolve that to multiple IP hosts (See {@link #initializeNtp(List)} for manually resolved IPs)
* Use this instead of {@link #initializeRx(String)} if you wish to also get additional info for
* instrumentation/tracking actual NTP response data
* @param ntpPool NTP pool server e.g. time.apple.com, 0.us.pool.ntp.org
* @return Observable of detailed long[] containing most important parts of the actual NTP response
* See RESPONSE_INDEX_ prefixes in {@link SntpClient} for details
public Single<long[]> initializeNtp(String ntpPool) {
return Flowable
代码示例来源:origin: TrustWallet/trust-wallet-android-source
public Single<Wallet> find() {
return walletRepository
.to(single -> Flowable.fromArray(single.blockingGet()))