rx.Observable.single()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(183)

本文整理了Java中rx.Observable.single()方法的一些代码示例,展示了Observable.single()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.single()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:single

Observable.single介绍

[英]Returns an Observable that emits the single item emitted by the source Observable, if that Observable emits only a single item. If the source Observable emits more than one item or no items, notify of an IllegalArgumentException or NoSuchElementException respectively.

Scheduler: single does not operate by default on a particular Scheduler.
[中]返回一个可观察对象,如果该可观察对象仅发射单个项,则该可观察对象发射源可观察对象发射的单个项。如果源Observable发出多个项或没有项,则分别通知IllegalArgumentException或NoSuchElementException。
调度程序:默认情况下,single不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  3. return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
  4. @Override
  5. public void call(BatchReturnType batchReturnType) {
  6. // this is a blocking call in HystrixCollapser
  7. self.mapResponseToRequests(batchReturnType, requests);
  8. }
  9. }).ignoreElements().cast(Void.class);
  10. }

代码示例来源:origin: HotelsDotCom/styx

  1. public static <T> CompletableFuture<T> fromSingleObservable(Observable<T> observable) {
  2. CompletableFuture<T> future = new CompletableFuture<>();
  3. observable.single().subscribe(future::complete, future::completeExceptionally);
  4. return future;
  5. }

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.just(1).single().subscribe(new Action1<Integer>() {
  4. @Override
  5. public void call(Integer integer) {
  6. log(integer);
  7. }
  8. });
  9. }
  10. });

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public N1qlQueryResult query(N1qlQuery query, long timeout, TimeUnit timeUnit) {
  3. return Blocking.blockForSingle(
  4. couchbaseAsyncCluster
  5. .query(query)
  6. .flatMap(N1qlQueryExecutor.ASYNC_RESULT_TO_SYNC)
  7. .single(), timeout, timeUnit);
  8. }

代码示例来源:origin: Qkyrie/spring-boot-netflix-example

  1. @RequestMapping(method = GET, value = "/status")
  2. public DeferredResult<String> getStatusPageUrl() {
  3. DeferredResult<String> result = new DeferredResult<>();
  4. notificationService.statusPageUrl().single()
  5. .subscribe(
  6. result::setResult,
  7. result::setErrorResult
  8. );
  9. return result;
  10. }

代码示例来源:origin: alaisi/postgres-async-driver

  1. Observable<Connection> connect(String username, String password, String database) {
  2. return stream.connect(new StartupMessage(username, database))
  3. .flatMap(message -> authenticate(username, password, message))
  4. .single(message -> message == ReadyForQuery.INSTANCE)
  5. .map(ready -> this);
  6. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public BucketSettings updateBucket(BucketSettings settings, long timeout, TimeUnit timeUnit) {
  3. return Blocking.blockForSingle(asyncClusterManager.updateBucket(settings).single(), timeout, timeUnit);
  4. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public DesignDocument upsertDesignDocument(final DesignDocument designDocument, final long timeout,
  3. final TimeUnit timeUnit) {
  4. return Blocking.blockForSingle(
  5. asyncBucketManager.upsertDesignDocument(designDocument).single(), timeout, timeUnit
  6. );
  7. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public DesignDocument insertDesignDocument(final DesignDocument designDocument, final long timeout,
  3. final TimeUnit timeUnit) {
  4. return Blocking.blockForSingle(
  5. asyncBucketManager.insertDesignDocument(designDocument).single(), timeout, timeUnit
  6. );
  7. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public DesignDocument insertDesignDocument(final DesignDocument designDocument, final boolean development,
  3. final long timeout, final TimeUnit timeUnit) {
  4. return Blocking.blockForSingle(
  5. asyncBucketManager.insertDesignDocument(designDocument, development).single(), timeout, timeUnit
  6. );
  7. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public Boolean removeDesignDocument(final String name, final long timeout, final TimeUnit timeUnit) {
  3. return Blocking.blockForSingle(
  4. asyncBucketManager.removeDesignDocument(name).single(), timeout, timeUnit
  5. );
  6. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public DesignDocument publishDesignDocument(final String name, final long timeout, final TimeUnit timeUnit) {
  3. return Blocking.blockForSingle(
  4. asyncBucketManager.publishDesignDocument(name).single(), timeout, timeUnit
  5. );
  6. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public DesignDocument publishDesignDocument(final String name, final boolean overwrite, final long timeout,
  3. final TimeUnit timeUnit) {
  4. return Blocking.blockForSingle(
  5. asyncBucketManager.publishDesignDocument(name, overwrite).single(), timeout, timeUnit
  6. );
  7. }

代码示例来源:origin: com.github.alaisi.pgasync/postgres-async-driver

  1. Observable<Connection> connect(String username, String password, String database) {
  2. return stream.connect(new StartupMessage(username, database))
  3. .flatMap(message -> authenticate(username, password, message))
  4. .single(message -> message == ReadyForQuery.INSTANCE)
  5. .map(ready -> this);
  6. }

代码示例来源:origin: com.microsoft.azure/azure-cosmosdb-gateway

  1. protected Observable<DocumentProducerFeedResponse> produceOnSplit(Observable<DocumentProducer<T>> replacementProducers) {
  2. Observable<DocumentProducerFeedResponse> res = replacementProducers.toList().single().flatMap(documentProducers -> {
  3. RequestChargeTracker tracker = new RequestChargeTracker();
  4. return OrderByUtils.orderedMerge(resourceType, consumeComparer, tracker, documentProducers)
  5. .map(orderByQueryResult -> resultPageFrom(tracker, orderByQueryResult));
  6. });
  7. return res;
  8. }

代码示例来源:origin: com.couchbase.client/java-client

  1. @Override
  2. public List<User> getUsers(AuthDomain domain, long timeout, TimeUnit timeUnit) {
  3. return Blocking.blockForSingle(asyncClusterManager.getUsers(domain).toList().single(), timeout, timeUnit);
  4. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. @Override
  2. public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
  3. return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
  4. @Override
  5. public void call(BatchReturnType batchReturnType) {
  6. // this is a blocking call in HystrixCollapser
  7. self.mapResponseToRequests(batchReturnType, requests);
  8. }
  9. }).ignoreElements().cast(Void.class);
  10. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_122() throws Exception {
  3. Observable<BigDecimal> totalPrice = Observable
  4. .just("bread", "butter", "milk", "tomato", "cheese")
  5. .subscribeOn(schedulerA) //BROKEN!!!
  6. .map(prod -> rxGroceries.doPurchase(prod, 1))
  7. .reduce(BigDecimal::add)
  8. .single();
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_135() throws Exception {
  3. final Observable<BigDecimal> totalPrice = Observable
  4. .just("bread", "butter", "milk", "tomato", "cheese")
  5. .subscribeOn(schedulerA)
  6. .flatMap(prod -> rxGroceries.purchase(prod, 1))
  7. .reduce(BigDecimal::add)
  8. .single();
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_145() throws Exception {
  3. Observable<BigDecimal> totalPrice = Observable
  4. .just("bread", "butter", "milk", "tomato", "cheese")
  5. .flatMap(prod ->
  6. rxGroceries
  7. .purchase(prod, 1)
  8. .subscribeOn(schedulerA))
  9. .reduce(BigDecimal::add)
  10. .single();
  11. }

相关文章

Observable类方法