rx.Observable.combineLatest()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(14.4k)|赞(0)|评价(0)|浏览(192)

本文整理了Java中rx.Observable.combineLatest()方法的一些代码示例,展示了Observable.combineLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.combineLatest()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:combineLatest

Observable.combineLatest介绍

[英]Combines a list of source Observables by emitting an item that aggregates the latest values of each of the source Observables each time an item is received from any of the source Observables, where this aggregation is defined by a specified function. Scheduler: combineLatest does not operate by default on a particular Scheduler.
[中]通过发出一个项来组合源可观测值列表,该项在每次从任何源可观测值接收到一个项时聚合每个源可观测值的最新值,其中该聚合由指定函数定义。调度器:默认情况下,CombineTest不会在特定的调度器上运行。

代码示例

代码示例来源:origin: konmik/nucleus

  1. @Override
  2. public Observable<Delivery<View, T>> call(Observable<T> observable) {
  3. return Observable
  4. .combineLatest(
  5. view,
  6. observable
  7. .materialize()
  8. .filter(new Func1<Notification<T>, Boolean>() {
  9. @Override
  10. public Boolean call(Notification<T> notification) {
  11. return !notification.isOnCompleted();
  12. }
  13. }),
  14. new Func2<View, Notification<T>, Delivery<View, T>>() {
  15. @Override
  16. public Delivery<View, T> call(View view, Notification<T> notification) {
  17. return view == null ? null : new Delivery<>(view, notification);
  18. }
  19. })
  20. .filter(new Func1<Delivery<View, T>, Boolean>() {
  21. @Override
  22. public Boolean call(Delivery<View, T> delivery) {
  23. return delivery != null;
  24. }
  25. });
  26. }
  27. }

代码示例来源:origin: BaronZ88/MinimalistWeather

  1. Observable<EnvironmentCloudCityAirLive> airLiveObservable = ApiClient.environmentCloudWeatherService.getAirLive(cityId);
  2. observableForGetWeatherFromNetWork = Observable.combineLatest(weatherLiveObservable, forecastObservable, airLiveObservable,
  3. (weatherLive, forecast, airLive) -> new CloudWeatherAdapter(weatherLive, forecast, airLive).getWeather());

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. private void _combineLatestEvents() {
  2. _subscription = Observable.combineLatest(_emailChangeObservable,
  3. _passwordChangeObservable,
  4. _numberChangeObservable,

代码示例来源:origin: ribot/ribot-app-android

  1. public void loadVenues() {
  2. getMvpView().showVenuesProgress(true);
  3. mSubscriptions.add(Observable.combineLatest(
  4. getTodayLatestCheckInAtVenue().defaultIfEmpty(null), mDataManager.getVenues(),
  5. new Func2<CheckIn, List<Venue>, VenuesInfo>() {
  6. @Override
  7. public VenuesInfo call(CheckIn checkIn, List<Venue> venues) {
  8. return new VenuesInfo(checkIn, venues);
  9. }
  10. })
  11. .observeOn(AndroidSchedulers.mainThread())
  12. .subscribeOn(Schedulers.io())
  13. .subscribe(new Subscriber<VenuesInfo>() {
  14. @Override
  15. public void onCompleted() {
  16. }
  17. @Override
  18. public void onError(Throwable e) {
  19. Timber.e("Error loading venues " + e);
  20. getMvpView().showVenuesProgress(false);
  21. }
  22. @Override
  23. public void onNext(VenuesInfo venuesInfo) {
  24. getMvpView().showVenues(
  25. venuesInfo.listVenues,
  26. venuesInfo.getTodayLatestCheckInAtVenueId());
  27. getMvpView().showVenuesProgress(false);
  28. }
  29. }));
  30. }

代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient

  1. @Nonnull
  2. public Observable<ItemsWithScroll> itemsWithScrollObservable() {
  3. return Observable.combineLatest(items, lastItemInView, new Func2<ImmutableList<MainPresenter.AdapterItem>, Boolean, ItemsWithScroll>() {
  4. @Override
  5. public ItemsWithScroll call(ImmutableList<MainPresenter.AdapterItem> adapterItems, Boolean isLastItemInList) {
  6. final int lastItemPosition = adapterItems.size() - 1;
  7. final boolean shouldScroll = isLastItemInList && lastItemPosition >= 0;
  8. return new ItemsWithScroll(adapterItems, shouldScroll, lastItemPosition);
  9. }
  10. });
  11. }

代码示例来源:origin: henrymorgen/android-advanced-light

  1. private void combineLastest() {
  2. Observable<Integer> obs1 =Observable.just(1,2,3);
  3. Observable<String> obs2 =Observable.just("a","b","c");
  4. Observable.combineLatest(obs1, obs2, new Func2<Integer, String, String>() {
  5. @Override
  6. public String call(Integer integer, String s) {
  7. return integer+s;
  8. }
  9. }).subscribe(new Action1<String>() {
  10. @Override
  11. public void call(String s) {
  12. Log.d(TAG, "combineLastest:"+s);
  13. }
  14. });
  15. }

代码示例来源:origin: xiaolongonly/Ticket-Analysis

  1. public static Subscription getLimitEnableStateSubscription(View enableView, FuncN<Boolean> funcN, EditText... editTexts) {
  2. List<Observable<CharSequence>> observableList = new ArrayList<>();
  3. for (EditText editText : editTexts) {
  4. observableList.add(RxTextView.textChanges(editText));
  5. }
  6. return Observable.combineLatest(observableList, funcN)
  7. .observeOn(AndroidSchedulers.mainThread())
  8. .subscribe(isEnable -> enableView.setEnabled(isEnable), throwable -> enableView.setEnabled(false));
  9. }
  10. }

代码示例来源:origin: com.netflix.eureka/eureka2-server

  1. public SelfInfoResolverChain(ChainableSelfInfoResolver... resolvers) {
  2. super(Observable.combineLatest(getObservableList(resolvers), new FuncN<InstanceInfo.Builder>() {
  3. @Override
  4. public InstanceInfo.Builder call(Object... args) {
  5. InstanceInfo.Builder seed = new InstanceInfo.Builder();
  6. for (Object obj : args) {
  7. InstanceInfo.Builder builder = (InstanceInfo.Builder) obj;
  8. seed.withBuilder(new InstanceInfo.Builder().withBuilder(builder)); // clone at each step
  9. }
  10. return seed;
  11. }
  12. })
  13. );
  14. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. /**
  2. * Combines two source Observables by emitting an item that aggregates the latest values of each of the
  3. * source Observables each time an item is received from either of the source Observables, where this
  4. * aggregation is defined by a specified function.
  5. * <p>
  6. * <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/combineLatest.png" alt="">
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. *
  12. * @param o1
  13. * the first source Observable
  14. * @param o2
  15. * the second source Observable
  16. * @param combineFunction
  17. * the aggregation function used to combine the items emitted by the source Observables
  18. * @return an Observable that emits items that are the result of combining the items emitted by the source
  19. * Observables by means of the given aggregation function
  20. * @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#combinelatest">RxJava wiki: combineLatest</a>
  21. * @see <a href="http://msdn.microsoft.com/en-us/library/hh211991.aspx">MSDN: Observable.CombineLatest</a>
  22. */
  23. @SuppressWarnings("unchecked")
  24. public static final <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
  25. return combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
  26. }

代码示例来源:origin: com.netflix.ocelli/ocelli-core

  1. /**
  2. * Given a list of observables that emit a boolean condition AND all conditions whenever
  3. * any condition changes and emit the resulting condition when the final condition changes.
  4. * @param sources
  5. * @return
  6. */
  7. public static Observable<Boolean> conditionAnder(List<Observable<Boolean>> sources) {
  8. return Observable.combineLatest(sources, new FuncN<Observable<Boolean>>() {
  9. @Override
  10. public Observable<Boolean> call(Object... args) {
  11. return Observable.from(args).cast(Boolean.class).firstOrDefault(true, new Func1<Boolean, Boolean>() {
  12. @Override
  13. public Boolean call(Boolean status) {
  14. return !status;
  15. }
  16. });
  17. }
  18. })
  19. .flatMap(new Func1<Observable<Boolean>, Observable<Boolean>>() {
  20. @Override
  21. public Observable<Boolean> call(Observable<Boolean> t1) {
  22. return t1;
  23. }
  24. })
  25. .distinctUntilChanged();
  26. }

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. public static final <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
  2. Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
  3. return combineLatest(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(combineFunction));

代码示例来源:origin: com.netflix.rxjava/rxjava-core

  1. /**
  2. * Combines three source Observables by emitting an item that aggregates the latest values of each of the
  3. * source Observables each time an item is received from any of the source Observables, where this
  4. * aggregation is defined by a specified function.
  5. * <p>
  6. * <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/combineLatest.png" alt="">
  7. * <dl>
  8. * <dt><b>Scheduler:</b></dt>
  9. * <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
  10. * </dl>
  11. *
  12. * @param o1
  13. * the first source Observable
  14. * @param o2
  15. * the second source Observable
  16. * @param o3
  17. * the third source Observable
  18. * @param combineFunction
  19. * the aggregation function used to combine the items emitted by the source Observables
  20. * @return an Observable that emits items that are the result of combining the items emitted by the source
  21. * Observables by means of the given aggregation function
  22. * @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#combinelatest">RxJava wiki: combineLatest</a>
  23. * @see <a href="http://msdn.microsoft.com/en-us/library/hh211991.aspx">MSDN: Observable.CombineLatest</a>
  24. */
  25. @SuppressWarnings("unchecked")
  26. public static final <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
  27. return combineLatest(Arrays.asList(o1, o2, o3), Functions.fromFunc(combineFunction));
  28. }

代码示例来源:origin: chiclaim/AndroidRxJavaSample

  1. private Observable<User> fetchFriendsInfo(User user) {
  2. Observable<User> observableUser = Observable.just(user);
  3. Observable<List<User>> observableUsers = Observable
  4. .from(user.getFriends())
  5. .flatMap(new Func1<User, Observable<User>>() {
  6. @Override
  7. public Observable<User> call(User user) {
  8. return userApi.fetchUserInfo(user.getId() + "");
  9. }
  10. })
  11. .toList();
  12. return Observable.combineLatest(observableUser, observableUsers, new Func2<User, List<User>, User>() {
  13. @Override
  14. public User call(User user, List<User> users) {
  15. user.setFriends(users);
  16. return user;
  17. }
  18. });
  19. }

代码示例来源:origin: marcoRS/rxjava-essentials

  1. private void loadList(List<AppInfo> apps) {
  2. mRecyclerView.setVisibility(View.VISIBLE);
  3. Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
  4. .map(position -> apps.get(position.intValue()));
  5. Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
  6. Observable.combineLatest(appsSequence, tictoc, this::updateTitle)
  7. .observeOn(AndroidSchedulers.mainThread())
  8. .subscribe(new Observer<AppInfo>() {
  9. @Override public void onCompleted() {
  10. Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
  11. }
  12. @Override public void onError(Throwable e) {
  13. mSwipeRefreshLayout.setRefreshing(false);
  14. Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
  15. }
  16. @Override public void onNext(AppInfo appInfo) {
  17. if (mSwipeRefreshLayout.isRefreshing()) {
  18. mSwipeRefreshLayout.setRefreshing(false);
  19. }
  20. mAddedApps.add(appInfo);
  21. int position = mAddedApps.size() - 1;
  22. mAdapter.addApplication(position, appInfo);
  23. mRecyclerView.smoothScrollToPosition(position);
  24. }
  25. });
  26. }

代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient

  1. public void sendPingWhenConnected() {
  2. Observable.combineLatest(
  3. Observable.interval(5, TimeUnit.SECONDS, scheduler),
  4. connectedAndRegistered,
  5. new Func2<Long, RxObjectEventConn, RxObjectEventConn>() {
  6. @Override
  7. public RxObjectEventConn call(Long aLong, RxObjectEventConn rxEventConn) {
  8. return rxEventConn;
  9. }
  10. })
  11. .compose(isConnected())
  12. .flatMap(new Func1<RxObjectEventConn, Observable<?>>() {
  13. @Override
  14. public Observable<?> call(RxObjectEventConn rxEventConn) {
  15. return RxMoreObservables.sendObjectMessage(rxEventConn.sender(), new PingMessage("send_only_when_connected"))
  16. .toObservable();
  17. }
  18. })
  19. .subscribe();
  20. }

代码示例来源:origin: Q42/RxPromise

  1. /**
  2. * Given an {@link Iterable} of promises, return a promise that is fulfilled when all the items in the {@link Iterable} are fulfilled.
  3. * The promise's fulfillment value is a {@link List} with fulfillment values in the original order.
  4. * If any promise rejects, the returned promise is rejected immediately with the rejection reason.
  5. *
  6. * @param promises The {@link List} of promises
  7. * @return A promise that combines all values of given promises into a {@link List}
  8. */
  9. @SuppressWarnings("unchecked")
  10. public static <T> Promise<List<T>> all(Iterable<Promise<T>> promises) {
  11. List<Observable<T>> observables = coerceToList(promises, Promise.<T>coerceToObservable());
  12. if (observables.isEmpty()) {
  13. return just(Collections.<T>emptyList());
  14. }
  15. return from(combineLatest(observables, new FuncN<List<T>>() {
  16. @Override
  17. public List<T> call(Object... args) {
  18. return asList((T[]) args);
  19. }
  20. }));
  21. }

代码示例来源:origin: TangoAgency/android-data-binding-rxjava

  1. public MainViewModel() {
  2. Observable.combineLatest(toObservable(firstName), toObservable(lastName), (firstName, lastName) -> StringUtils.isNotNullOrEmpty(firstName) && StringUtils.isNotNullOrEmpty(lastName))
  3. .subscribe(result -> {
  4. helloButtonEnabled.set(result);
  5. if (!result) {
  6. helloText.set(StringUtils.EMPTY);
  7. }
  8. }, Throwable::printStackTrace);
  9. }

代码示例来源:origin: SmartDengg/RxBlur

  1. @Override public void setListener() {
  2. Observable<TextViewTextChangeEvent> radiusObservable =
  3. RxTextView.textChangeEvents(radiusEt).skip(1);
  4. Observable<TextViewTextChangeEvent> durationObservable =
  5. RxTextView.textChangeEvents(durationEt).skip(1);
  6. Observable.combineLatest(radiusObservable, durationObservable,
  7. new Func2<TextViewTextChangeEvent, TextViewTextChangeEvent, Boolean>() {
  8. @Override public Boolean call(TextViewTextChangeEvent radiusEvent,
  9. TextViewTextChangeEvent durationEvent) {
  10. radius = radiusEvent.text().toString();
  11. duration = durationEvent.text().toString();
  12. return !TextUtils.isEmpty(radius) && !TextUtils.isEmpty(duration);
  13. }
  14. })
  15. .debounce(300, TimeUnit.MILLISECONDS)
  16. .compose(RxAnimatorBlurActivity.this.<Boolean>bindUntilEvent(ActivityEvent.DESTROY))
  17. .observeOn(AndroidSchedulers.mainThread())
  18. .startWith(false)
  19. .subscribe(new Action1<Boolean>() {
  20. @Override public void call(Boolean aBoolean) {
  21. blurBtn.setEnabled(aBoolean);
  22. }
  23. });
  24. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_345() throws Exception {
  3. Observable.combineLatest(
  4. interval(17, MILLISECONDS).map(x -> "S" + x),
  5. interval(10, MILLISECONDS).map(x -> "F" + x),
  6. (s, f) -> f + ":" + s
  7. ).forEach(System.out::println);
  8. Sleeper.sleep(Duration.ofSeconds(2));
  9. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_121() throws Exception {
  3. EditText latText = null;//...
  4. EditText lonText = null;//...
  5. Observable<Double> latChanges = RxTextView
  6. .afterTextChangeEvents(latText)
  7. .flatMap(toDouble());
  8. Observable<Double> lonChanges = RxTextView
  9. .afterTextChangeEvents(lonText)
  10. .flatMap(toDouble());
  11. Observable<Cities> cities = Observable
  12. .combineLatest(latChanges, lonChanges, toPair())
  13. .debounce(1, TimeUnit.SECONDS)
  14. .flatMap(listCitiesNear());
  15. }

相关文章

Observable类方法