rx.Observable.zip()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(296)

本文整理了Java中rx.Observable.zip()方法的一些代码示例,展示了Observable.zip()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.zip()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:zip

Observable.zip介绍

[英]Returns an Observable that emits items that are the result of applying a specified function to pairs of values, one each from the source Observable and a specified Iterable sequence.

Note that the other Iterable is evaluated as items are observed from the source Observable; it is not pre-consumed. This allows you to zip infinite streams on either side. Scheduler: zip does not operate by default on a particular Scheduler.
[中]返回一个Observable,它发出的项是将指定函数应用于成对值的结果,每个值来自源Observable和指定的Iterable序列。
请注意,另一个Iterable是从源可观察项中观察到的;它不是预先消费的。这允许你在两边压缩无限的流。调度器:默认情况下,zip不会在特定的调度器上运行。

代码示例

代码示例来源:origin: amitshekhariitbhu/Fast-Android-Networking

@Override
  public Observable<Pair<UserDetail, User>> call(User user) {
    // here we get the user one by one and then we are zipping
    // two observable - one getUserDetailObservable (network call to get userDetail)
    // and another Observable.just(user) - just to emit user
    return Observable.zip(getUserDetailObservable(user.id), // zip to combine two observable
        Observable.just(user),
        new Func2<UserDetail, User, Pair<UserDetail, User>>() {
          @Override
          public Pair<UserDetail, User> call(UserDetail userDetail, User user) {
            // runs when network call completes
            // we get here userDetail for the corresponding user
            return new Pair<>(userDetail, user); // returning the pair(userDetail, user)
          }
        });
  }
})

代码示例来源:origin: apache/usergrid

public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                               WriteStart writeState ) {
  return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
    Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
      Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
        .doOnNext( writeVerifyUnique );
    // optimistic verification
    Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
      Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
        .doOnNext( writeOptimisticVerify );
    final Observable<CollectionIoEvent<MvccEntity>> zip =
      Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
    return zip;
  } );
}

代码示例来源:origin: amitshekhariitbhu/Fast-Android-Networking

private void findUsersWhoLovesBoth() {
  Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
      new Func2<List<User>, List<User>, List<User>>() {
        @Override

代码示例来源:origin: apache/usergrid

Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>() {

代码示例来源:origin: jooby-project/jooby

@Override
public <T> Observable<List<T>> query(final N1qlQuery query) {
 return bucket.query(query)
   .flatMap(aqr -> Observable.zip(aqr.rows().toList(),
     aqr.errors().toList(),
     aqr.finalSuccess().singleOrDefault(Boolean.FALSE),
     (rows, errors, finalSuccess) -> {
      if (!finalSuccess) {
       throw new QueryExecutionException(
         "execution of query resulted in exception: ",
         Try.apply(() -> errors.get(0)).orElse(null));
      }
      List<T> value = new ArrayList<>();
      for (AsyncN1qlQueryRow row : rows) {
       try {
        T v = converter.fromBytes(row.byteValue());
        value.add(v);
       } catch (IOException ex) {
        throw new QueryExecutionException(
          "execution of query resulted in exception", null, ex);
       }
      }
      return value;
     }));
}

代码示例来源:origin: hidroh/materialistic

break;
Observable.defer(() -> Observable.zip(
    mSessionManager.isViewed(itemId),
    mFavoriteManager.check(itemId),

代码示例来源:origin: PipelineAI/pipeline

Observable<String> zipped = Observable.zip(error, cmdResult, new Func2<String, Integer, String>() {
  @Override
  public String call(String s, Integer integer) {

代码示例来源:origin: PipelineAI/pipeline

Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixUtilization, HystrixUtilization, Boolean>() {
  @Override
  public Boolean call(HystrixUtilization payload, HystrixUtilization payload2) {

代码示例来源:origin: jhusain/learnrxjava

/**
 * Combine 2 streams into pairs using zip.
 * 
 * a -> "one", "two", "red", "blue"
 * b -> "fish", "fish", "fish", "fish"
 * output -> "one fish", "two fish", "red fish", "blue fish"
 */
public Observable<String> exerciseZip(Observable<String> a, Observable<String> b) {
  return Observable.zip(a, b, (x, y) -> x + " " + y);
}

代码示例来源:origin: PipelineAI/pipeline

Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixConfiguration, HystrixConfiguration, Boolean>() {
  @Override
  public Boolean call(HystrixConfiguration payload, HystrixConfiguration payload2) {

代码示例来源:origin: PipelineAI/pipeline

Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData, Boolean>() {
  @Override
  public Boolean call(HystrixDashboardStream.DashboardData payload, HystrixDashboardStream.DashboardData payload2) {

代码示例来源:origin: PipelineAI/pipeline

.observeOn(Schedulers.computation());
Observable<Boolean> zipped = Observable.zip(o1, o2, new Func2<HystrixCommandMetrics.HealthCounts, HystrixCommandMetrics.HealthCounts, Boolean>() {
  @Override
  public Boolean call(HystrixCommandMetrics.HealthCounts healthCounts, HystrixCommandMetrics.HealthCounts healthCounts2) {

代码示例来源:origin: itmuch/spring-cloud-docker-microservice-book-code

public Observable<HashMap<String, User>> aggregateObservable(Long id) {
 // 合并两个或者多个Observables发射出的数据项,根据指定的函数变换它们
 return Observable.zip(
     this.aggregationService.getUserById(id),
     this.aggregationService.getMovieUserByUserId(id),
     (user, movieUser) -> {
      HashMap<String, User> map = Maps.newHashMap();
      map.put("user", user);
      map.put("movieUser", movieUser);
      return map;
     }
 );
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@Override
  public Observable<Pair<User, Contributor>> call(Contributor contributor) {
    Observable<User> _userObservable = _api.user(contributor.login)
       .filter(new Func1<User, Boolean>() {
         @Override
         public Boolean call(User user) {
           return !isEmpty(user.name) && !isEmpty(user.email);
         }
       });
    return Observable.zip(_userObservable,
       Observable.just(contributor),
       new Func2<User, Contributor, Pair<User, Contributor>>() {
         @Override
         public Pair<User, Contributor> call(User user,
                           Contributor contributor) {
           return new Pair<>(user, contributor);
         }
       });
  }
})

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {
    Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
      return d + " " + t;
    }).toBlocking().forEach(System.out::println);
    
  }
}

代码示例来源:origin: cn-ljb/rxjava_for_android

private void getContactData() {
  Observable.zip(
      queryContactsFromLocation(),
      queryContactsForNet(),
      new Func2<List<Contacter>, List<Contacter>, List<Contacter>>() {
        @Override
        public List<Contacter> call(List<Contacter> contacters, List<Contacter> contacters2) {
          contacters.addAll(contacters2);
          return contacters;
        }
      }
  ).compose(this.<List<Contacter>>bindToLifecycle())
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Action1<List<Contacter>>() {
        @Override
        public void call(List<Contacter> contacters) {
          initPage(contacters);
        }
      });
}

代码示例来源:origin: jhusain/learnrxjava

public static void run() {
  Observable<Tile> searchTile = getSearchResults("search term");
  Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
    Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
    Observable<String> imageUrl = getProductImage(t.getProductId());
    return Observable.zip(reviews, imageUrl, (r, u) -> {
      return new TileResponse(t, r, u);
    });
  });
  List<TileResponse> allTiles = populatedTiles.toList()
      .toBlocking().single();
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String[] args) {
  final long startTime = System.currentTimeMillis();
  Observable<Tile> searchTile = getSearchResults("search term")
      .doOnSubscribe(() -> logTime("Search started ", startTime))
      .doOnCompleted(() -> logTime("Search completed ", startTime));
  Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
    Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
        .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
    Observable<String> imageUrl = getProductImage(t.getProductId())
        .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));
    return Observable.zip(reviews, imageUrl, (r, u) -> {
      return new TileResponse(t, r, u);
    }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
  });
  List<TileResponse> allTiles = populatedTiles.toList()
      .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
      .toBlocking().single();
}

代码示例来源:origin: MaksTuev/ferro

private void imitateDownloading(String bookId) {
  List<Integer> percents = Arrays.asList(5, 17, 33, 50, 66, 81, 92, 100);
  Observable.zip(
      Observable.interval(600, TimeUnit.MILLISECONDS),
      Observable.from(percents),
      (t, percent) -> percent)
      .doOnNext(percent -> updateBook(bookId, percent))
      .subscribe();
}

代码示例来源:origin: gitskarios/GithubAndroidSdk

private Observable<IssueStory> getIssueStory() {
  return Observable.zip(getIssueObservable(), getIssueDetailsObservable(), (issue, details) -> {
    IssueStory issueStory = new IssueStory();
    issueStory.item = issue;
    issueStory.details = details;
    Collections.sort(issueStory.details,
        IssueStoryComparators.ISSUE_STORY_DETAIL_COMPARATOR);
    return issueStory;
  });
}

相关文章

Observable类方法