rx.Observable.cache()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.8k)|赞(0)|评价(0)|浏览(172)

本文整理了Java中rx.Observable.cache()方法的一些代码示例,展示了Observable.cache()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.cache()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:cache

Observable.cache介绍

[英]Caches the emissions from the source Observable and replays them in order to any subsequent Subscribers. This method has similar behavior to #replay except that this auto-subscribes to the source Observable rather than returning a ConnectableObservable for which you must call connect to activate the subscription.

This is useful when you want an Observable to cache responses and you can't control the subscribe/unsubscribe behavior of all the Subscribers.

When you call cache, it does not yet subscribe to the source Observable and so does not yet begin cacheing items. This only happens when the first Subscriber calls the resulting Observable's subscribe method.

Note: You sacrifice the ability to unsubscribe from the origin when you use the cacheObserver so be careful not to use this Observer on Observables that emit an infinite or very large number of items that will use up memory. Backpressure Support: This operator does not support upstream backpressure as it is purposefully requesting and caching everything emitted. Scheduler: cache does not operate by default on a particular Scheduler.
[中]缓存可观测源的排放,并将其重放,以供后续用户使用。此方法的行为与#replay类似,只是它会自动订阅源Observable,而不是返回必须调用connect才能激活订阅的ConnectableObservable。
当您想要一个可观察的缓存响应,并且无法控制所有订阅者的订阅/取消订阅行为时,这非常有用。
当你调用cache时,它还没有订阅源Observable,因此还没有开始缓存项。只有当第一个订户调用产生的Observable的subscribe方法时,才会发生这种情况。
*注意:*当您使用cacheObserver时,您牺牲了从源站取消订阅的能力,因此请小心不要在发出无限或非常大量项目的观察对象上使用该观察者,这将耗尽内存。背压支持:该操作员不支持上游背压,因为它有意请求并缓存所有发出的内容。调度程序:默认情况下,缓存不会在特定调度程序上运行。

代码示例

代码示例来源:origin: bumptech/glide

.cache();

代码示例来源:origin: davidmoten/rxjava-extras

@Override
public void call(Subscriber<? super T> subscriber) {
  if (refresh.compareAndSet(true, false)) {
    current = source.cache();
  }
  current.unsafeSubscribe(subscriber);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
public void call(Subscriber<? super T> subscriber) {
  if (refresh.compareAndSet(true, false)) {
    current = source.cache();
  }
  current.unsafeSubscribe(subscriber);
}

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable o = Observable.range(1, 10).cache();
  o.subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log("s1:" + integer);
   }
  });
  o.subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log("s2:" + integer);
   }
  });
 }
});

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@SuppressWarnings("unchecked")
@Override
public <T> Observable<T> findGaugeData(MetricId<Double> id, long start, long end,
                    Func1<Observable<DataPoint<Double>>, Observable<T>>... funcs) {
  Observable<DataPoint<Double>> dataCache = this.findDataPoints(id, start, end, 0, Order.DESC).cache();
  return Observable.from(funcs).flatMap(fn -> fn.call(dataCache));
}

代码示例来源:origin: hawkular/hawkular-metrics

@SuppressWarnings("unchecked")
@Override
public <T> Observable<T> findGaugeData(MetricId<Double> id, long start, long end,
                    Func1<Observable<DataPoint<Double>>, Observable<T>>... funcs) {
  Observable<DataPoint<Double>> dataCache = this.findDataPoints(id, start, end, 0, Order.DESC).cache();
  return Observable.from(funcs).flatMap(fn -> fn.call(dataCache));
}

代码示例来源:origin: Jerey-Jobs/KeepGank

private void loadData(int pager) {
  GankApi.getInstance()
      .getWebService()
      .getBenefitsGoods(GankApi.LOAD_LIMIT, pager)
      .cache()
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(dataObservable);
}

代码示例来源:origin: com.netflix.eureka/eureka2-core

private Observable<Void> writeWhenSubscribed(final Object message) {
  return connection.writeAndFlush(message)
      .doOnCompleted(new Action0() {
        @Override
        public void call() {
          metrics.incrementOutgoingMessageCounter(message.getClass(), 1);
        }
      })
      .cache();
}

代码示例来源:origin: com.netflix.eureka/eureka2-core

protected AbstractClientChannel(final STATE initState, final TransportClient client, StateMachineMetrics<STATE> metrics) {
  super(initState, metrics);
  this.client = client;
  singleConnection = client.connect()
      .take(1)
      .map(new Func1<MessageConnection, MessageConnection>() {
        @Override
        public MessageConnection call(MessageConnection serverConnection) {
          if (connectionIfConnected == null) {
            connectionIfConnected = serverConnection;
          }
          subscribeToConnectionLifecycle(connectionIfConnected);
          return connectionIfConnected;
        }
      })
      .cache();
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_177() throws Exception {
  Observable<Integer> ints =
      Observable.<Integer>create(subscriber -> {
            //...
          }
      )
          .cache();
}

代码示例来源:origin: trivago/Heimdall.droid

/**
 * Grants a new access token using the given OAuth2 grant.
 *
 * @param grant    A class implementing the OAuth2Grant interface.
 * @param calendar A calendar instance used to calculate the expiration date of the token.
 * @return - An observable emitting the granted access token.
 */
public Single<TAccessToken> grantNewAccessToken(OAuth2Grant<TAccessToken> grant, Calendar calendar) {
  if (grant == null) {
    throw new IllegalArgumentException("Grant MUST NOT be null.");
  }
  return grant.grantNewAccessToken()
      .doOnSuccess(accessToken -> {
        if (accessToken.expiresIn != null) {
          Calendar expirationDate = (Calendar) calendar.clone();
          expirationDate.add(Calendar.SECOND, accessToken.expiresIn);
          accessToken.expirationDate = expirationDate;
        }
        mStorage.storeAccessToken(accessToken);
      }).toObservable().cache().toSingle();
}

代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.pipeline.impl

JsonPipelineImpl cloneWith(Observable<JsonPipelineOutput> newObservable, String descriptorSuffix, String action, Class actionClass) {
 JsonPipelineImpl clone = new JsonPipelineImpl();
 clone.sourceServiceIds.addAll(this.sourceServiceIds);
 clone.requests.addAll(this.requests);
 clone.descriptor = this.descriptor;
 if (StringUtils.isNotBlank(descriptorSuffix)) {
  clone.descriptor += "+" + descriptorSuffix;
 }
 clone.observable = newObservable.cache();
 clone.context = context;
 clone.performanceMetrics = performanceMetrics.createNext(action, clone.descriptor, actionClass);
 clone.observable = clone.observable
   .doOnSubscribe(clone.performanceMetrics.getStartAction())
   .doOnNext(clone.performanceMetrics.getOnNextAction())
   .doOnTerminate(clone.performanceMetrics.getEndAction());
 return clone;
}

代码示例来源:origin: com.nytimes.android/store

@Nonnull
Observable<Parsed> response(@Nonnull final Key key) {
  return fetcher()
      .fetch(key)
      .flatMap(raw -> persister()
          .write(key, raw)
          .flatMap(aBoolean -> readDisk(key)))
      .onErrorReturn(throwable -> {
        if (stalePolicy == StalePolicy.NETWORK_BEFORE_STALE) {
          return readDisk(key, throwable).toBlocking().first();
        }
        throw Exceptions.propagate(throwable);
      })
      .doOnNext(this::notifySubscribers)
      .doOnTerminate(() -> inFlightRequests.invalidate(key))
      .cache();
}

代码示例来源:origin: pakoito/RxFunctions

@Test
public void testReduce() throws Exception {
  for (int i = 0; i < ITERATIONS; i++) {
    Collections.shuffle(USER_NAMES);
    final Observable<User> userList = Observable.from(USER_NAMES).map(NAME_TO_USER).cache();
    List<User> separated = userList.filter(CORRECT_NAME).filter(IS_RETIRED)
        .filter(IS_HIGH_INCOME).toList().toBlocking().first();
    final Func1<User, Boolean> mergeFilter = RxFunctions.reduce(true, AND, CORRECT_NAME,
        IS_RETIRED, IS_HIGH_INCOME);
    List<User> chained = userList.filter(mergeFilter).toList().toBlocking().first();
    Assert.assertEquals(separated, chained);
  }
}

代码示例来源:origin: pakoito/RxFunctions

@Test
public void testAnd() throws Exception {
  for (int i = 0; i < ITERATIONS; i++) {
    Collections.shuffle(USER_NAMES);
    final Observable<User> userList = Observable.from(USER_NAMES).map(NAME_TO_USER).cache();
    List<User> separated = userList.filter(CORRECT_NAME).filter(IS_RETIRED)
        .filter(IS_HIGH_INCOME).toList().toBlocking().first();
    final Func1<User, Boolean> mergeFilter = RxFunctions.and(CORRECT_NAME, IS_RETIRED,
        IS_HIGH_INCOME);
    List<User> chained = userList.filter(mergeFilter).toList().toBlocking().first();
    Assert.assertEquals(separated, chained);
  }
}

代码示例来源:origin: tehmou/RxJava-code-examples

@Test
 public void testCache() {
  Observer testObserver1 = mock(Observer.class);
  Observer testObserver2 = mock(Observer.class);

  // Create a cached observable that saves all values it receives from
  // the original source and gives the forward to all of its subscribers.
  Observable<Integer> cachedObservable = observable.cache();

  cachedObservable.subscribe(testObserver1);
  cachedObservable.subscribe(testObserver2);

  verify(testObserver1).onNext(0);
  verify(testObserver2).onNext(0);

  // The original observable is still of course there:
  Observer testObserver3 = mock(Observer.class);

  observable.subscribe(testObserver3);

  verify(testObserver3).onNext(1);

  // The other two observers are connected to the cache, and
  // do not receive any values.
  verify(testObserver1, never()).onNext(1);
  verify(testObserver2, never()).onNext(1);
 }
}

代码示例来源:origin: com.nytimes.android/store

Observable<Parsed> readDisk(@Nonnull final Key key, final Throwable error) {
  return persister().read(key)
      .onErrorReturn(throwable -> {
        if (error == null) {
          throw Exceptions.propagate(throwable);
        }
        throw Exceptions.propagate(error);
      })
      .map(raw -> parser.call(key, raw))
      .doOnNext(parsed -> {
        updateMemory(key, parsed);
        if (stalePolicy == StalePolicy.REFRESH_ON_STALE
            && persisterIsStale(key, persister)) {
          backfillCache(key);
        }
      })
      .cache();
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_291() throws Exception {
  risky().cache().retry();  //BROKEN
}

代码示例来源:origin: io.wcm.caravan/io.wcm.caravan.pipeline.impl

/**
 * @param request the REST request that provides the source data
 * @param responseObservable the response observable obtained by the {@link CaravanHttpClient}
 * @param context preinitialized JSON pipeline context
 */
public JsonPipelineImpl(final CaravanHttpRequest request, final Observable<CaravanHttpResponse> responseObservable, final JsonPipelineContextImpl context) {
 if (isNotBlank(request.getServiceId())) {
  this.sourceServiceIds.add(request.getServiceId());
 }
 this.requests.add(request);
 this.descriptor = isNotBlank(request.getUrl()) ? "GET(//" + request.getServiceId() + request.getUrl() + ")" : "EMPTY()";
 this.observable = responseObservable.lift(new ResponseHandlingOperator(request)).cache();
 this.context = context;
 if (request.getPerformanceMetrics() != null) {
  this.performanceMetrics = request.getPerformanceMetrics().createNext(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor);
 } else {
  this.performanceMetrics = PerformanceMetrics.createNew(isNotBlank(request.getUrl()) ? "GET" : "EMPTY", descriptor, request.getCorrelationId());
 }
 this.observable = this.observable
   .doOnSubscribe(this.performanceMetrics.getStartAction())
   .doOnNext(this.performanceMetrics.getOnNextAction())
   .doOnTerminate(this.performanceMetrics.getEndAction());
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_113() throws Exception {
  Single<String> single = Single.create(subscriber -> {
    System.out.println("Subscribing");
    subscriber.onSuccess("42");
  });
  Single<String> cachedSingle = single
      .toObservable()
      .cache()
      .toSingle();
  cachedSingle.subscribe(System.out::println);
  cachedSingle.subscribe(System.out::println);
}

相关文章

Observable类方法