rx.Observable.defer()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中rx.Observable.defer()方法的一些代码示例,展示了Observable.defer()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.defer()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:defer

Observable.defer介绍

[英]Returns an Observable that calls an Observable factory to create an Observable for each new Observer that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is determined by the factory function.

The defer Observer allows you to defer or delay emitting items from an Observable until such time as an Observer subscribes to the Observable. This allows an Observer to easily obtain updates or a refreshed version of the sequence. Scheduler: defer does not operate by default on a particular Scheduler.
[中]返回一个Observable,调用Observable工厂为订阅的每个新观察者创建一个Observable。也就是说,对于每个订阅者,订阅者观察到的实际可观测值由工厂函数确定。
“延迟观察者”允许您延迟或延迟从可观察对象发射项目,直到观察者订阅可观察对象。这允许观察者轻松获得序列的更新或刷新版本。调度程序:默认情况下,延迟不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
final protected Observable<R> getFallbackObservable() {
  return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
      try {
        return Observable.just(getFallback());
      } catch (Throwable ex) {
        return Observable.error(ex);
      }
    }
  });
}

代码示例来源:origin: jaydenxiao2016/AndroidFire

public Observable<Bitmap> compressToBitmapAsObservable(final File file) {
  return Observable.defer(new Func0<Observable<Bitmap>>() {
    @Override
    public Observable<Bitmap> call() {
      return Observable.just(compressToBitmap(file));
    }
  });
}

代码示例来源:origin: jaydenxiao2016/AndroidFire

public Observable<File> compressToFileAsObservable(final File file) {
  return Observable.defer(new Func0<Observable<File>>() {
    @Override
    public Observable<File> call() {
      return Observable.just(compressToFile(file));
    }
  });
}

代码示例来源:origin: PipelineAI/pipeline

return Observable.defer(new Func0<Observable<ResponseType>>() {
  @Override
  public Observable<ResponseType> call() {

代码示例来源:origin: PipelineAI/pipeline

return Observable.defer(new Func0<Observable<ResponseType>>() {
  @Override
  public Observable<ResponseType> call() {

代码示例来源:origin: greenrobot/greenDAO

/** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */
  @Internal
  static <T> Observable<T> fromCallable(final Callable<T> callable) {
    return Observable.defer(new Func0<Observable<T>>() {

      @Override
      public Observable<T> call() {
        T result;
        try {
          result = callable.call();
        } catch (Exception e) {
          return Observable.error(e);
        }
        return Observable.just(result);
      }
    });
  }
}

代码示例来源:origin: PipelineAI/pipeline

@Override
  protected Observable<Boolean> resumeWithFallback() {
    return Observable.defer(new Func0<Observable<Boolean>>() {
      @Override
      public Observable<Boolean> call() {
        startLatch.countDown();
        return Observable.just(false);
      }
    });
  }
}

代码示例来源:origin: hidroh/materialistic

private Observable<Response> execute(Request request) {
  return Observable.defer(() -> {
    try {
      return Observable.just(mCallFactory.newCall(request).execute());
    } catch (IOException e) {
      return Observable.error(e);
    }
  }).subscribeOn(mIoScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
final protected Observable<R> getExecutionObservable() {
  return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
      try {
        return Observable.just(run());
      } catch (Throwable ex) {
        return Observable.error(ex);
      }
    }
  }).doOnSubscribe(new Action0() {
    @Override
    public void call() {
      // Save thread on which we get subscribed so that we can interrupt it later if needed
      executionThread.set(Thread.currentThread());
    }
  });
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
  this.numBuckets = numBuckets;
  this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
    @Override
    public Observable<Bucket> call(Observable<Event> eventBucket) {
      return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
    }
  };
  final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
  for (int i = 0; i < numBuckets; i++) {
    emptyEventCountsToStart.add(getEmptyBucketSummary());
  }
  this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
    @Override
    public Observable<Bucket> call() {
      return inputEventStream
          .observe()
          .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
          .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
          .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
    }
  });
}

代码示例来源:origin: hidroh/materialistic

@Override
public void getStories(@FetchMode String filter, @CacheMode int cacheMode,
            final ResponseListener<Item[]> listener) {
  if (listener == null) {
    return;
  }
  Observable.defer(() -> getStoriesObservable(filter, cacheMode))
      .subscribeOn(mIoScheduler)
      .observeOn(mMainThreadScheduler)
      .subscribe(listener::onResponse,
          t -> listener.onError(t != null ? t.getMessage() : ""));
}

代码示例来源:origin: PipelineAI/pipeline

return Observable.defer(new Func0<Observable<R>>() {
  @Override
  public Observable<R> call() {

代码示例来源:origin: hidroh/materialistic

@Override
public void parse(String itemId, String url, Callback callback) {
  Observable.defer(() -> fromCache(itemId))
      .subscribeOn(mIoScheduler)
      .flatMap(content -> content != null ?
          Observable.just(content) : fromNetwork(itemId, url))
      .map(content -> AndroidUtils.TextUtils.equals(EMPTY_CONTENT, content) ? null : content)
      .observeOn(mMainThreadScheduler)
      .subscribe(callback::onResponse);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
  protected Observable<Boolean> construct() {
    return Observable.defer(new Func0<Observable<Boolean>>() {
      @Override
      public Observable<Boolean> call() {
        try {
          Thread.sleep(1000);
        }
        catch (InterruptedException e) {
          System.out.println("Interrupted!");
          e.printStackTrace();
          hasBeenInterrupted = true;
        }
        return Observable.just(hasBeenInterrupted);
      }
    }).subscribeOn(Schedulers.io());
  }
}

代码示例来源:origin: hidroh/materialistic

@WorkerThread
@Override
public void parse(String itemId, String url) {
  Observable.defer(() -> fromCache(itemId))
      .subscribeOn(Schedulers.immediate())
      .switchIfEmpty(fromNetwork(itemId, url))
      .map(content -> AndroidUtils.TextUtils.equals(EMPTY_CONTENT, content) ? null : content)
      .observeOn(Schedulers.immediate())
      .subscribe();
}

代码示例来源:origin: PipelineAI/pipeline

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
  if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
    return Observable.defer(new Func0<Observable<R>>() {
      @Override
      public Observable<R> call() {
    return Observable.defer(new Func0<Observable<R>>() {
      @Override
      public Observable<R> call() {

代码示例来源:origin: ReactiveX/RxNetty

return Observable.defer(new Func0<Observable<Channel>>() {
  @Override
  public Observable<Channel> call() {

代码示例来源:origin: PipelineAI/pipeline

List<Observable<Integer>> commands = new ArrayList<Observable<Integer>>();
for (int i = 0; i < NUM_COMMANDS; i++) {
  commands.add(Observable.defer(new Func0<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {

代码示例来源:origin: PipelineAI/pipeline

Observable.defer(applyHystrixSemantics)
    .map(wrapWithAllOnNextHooks);

代码示例来源:origin: hidroh/materialistic

break;
Observable.defer(() -> Observable.zip(
    mSessionManager.isViewed(itemId),
    mFavoriteManager.check(itemId),

相关文章

Observable类方法