rx.Observable.combineLatest()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(14.4k)|赞(0)|评价(0)|浏览(161)

本文整理了Java中rx.Observable.combineLatest()方法的一些代码示例,展示了Observable.combineLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.combineLatest()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:combineLatest

Observable.combineLatest介绍

[英]Combines a list of source Observables by emitting an item that aggregates the latest values of each of the source Observables each time an item is received from any of the source Observables, where this aggregation is defined by a specified function. Scheduler: combineLatest does not operate by default on a particular Scheduler.
[中]通过发出一个项来组合源可观测值列表,该项在每次从任何源可观测值接收到一个项时聚合每个源可观测值的最新值,其中该聚合由指定函数定义。调度器:默认情况下,CombineTest不会在特定的调度器上运行。

代码示例

代码示例来源:origin: konmik/nucleus

@Override
  public Observable<Delivery<View, T>> call(Observable<T> observable) {
    return Observable
      .combineLatest(
        view,
        observable
          .materialize()
          .filter(new Func1<Notification<T>, Boolean>() {
            @Override
            public Boolean call(Notification<T> notification) {
              return !notification.isOnCompleted();
            }
          }),
        new Func2<View, Notification<T>, Delivery<View, T>>() {
          @Override
          public Delivery<View, T> call(View view, Notification<T> notification) {
            return view == null ? null : new Delivery<>(view, notification);
          }
        })
      .filter(new Func1<Delivery<View, T>, Boolean>() {
        @Override
        public Boolean call(Delivery<View, T> delivery) {
          return delivery != null;
        }
      });
  }
}

代码示例来源:origin: BaronZ88/MinimalistWeather

Observable<EnvironmentCloudCityAirLive> airLiveObservable = ApiClient.environmentCloudWeatherService.getAirLive(cityId);
observableForGetWeatherFromNetWork = Observable.combineLatest(weatherLiveObservable, forecastObservable, airLiveObservable,
    (weatherLive, forecast, airLive) -> new CloudWeatherAdapter(weatherLive, forecast, airLive).getWeather());

代码示例来源:origin: THEONE10211024/RxJavaSamples

private void _combineLatestEvents() {
  _subscription = Observable.combineLatest(_emailChangeObservable,
     _passwordChangeObservable,
     _numberChangeObservable,

代码示例来源:origin: ribot/ribot-app-android

public void loadVenues() {
  getMvpView().showVenuesProgress(true);
  mSubscriptions.add(Observable.combineLatest(
      getTodayLatestCheckInAtVenue().defaultIfEmpty(null), mDataManager.getVenues(),
      new Func2<CheckIn, List<Venue>, VenuesInfo>() {
        @Override
        public VenuesInfo call(CheckIn checkIn, List<Venue> venues) {
          return new VenuesInfo(checkIn, venues);
        }
      })
      .observeOn(AndroidSchedulers.mainThread())
      .subscribeOn(Schedulers.io())
      .subscribe(new Subscriber<VenuesInfo>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
          Timber.e("Error loading venues " + e);
          getMvpView().showVenuesProgress(false);
        }
        @Override
        public void onNext(VenuesInfo venuesInfo) {
          getMvpView().showVenues(
              venuesInfo.listVenues,
              venuesInfo.getTodayLatestCheckInAtVenueId());
          getMvpView().showVenuesProgress(false);
        }
      }));
}

代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient

@Nonnull
public Observable<ItemsWithScroll> itemsWithScrollObservable() {
  return Observable.combineLatest(items, lastItemInView, new Func2<ImmutableList<MainPresenter.AdapterItem>, Boolean, ItemsWithScroll>() {
    @Override
    public ItemsWithScroll call(ImmutableList<MainPresenter.AdapterItem> adapterItems, Boolean isLastItemInList) {
      final int lastItemPosition = adapterItems.size() - 1;
      final boolean shouldScroll = isLastItemInList && lastItemPosition >= 0;
      return new ItemsWithScroll(adapterItems, shouldScroll, lastItemPosition);
    }
  });
}

代码示例来源:origin: henrymorgen/android-advanced-light

private void combineLastest() {
  Observable<Integer> obs1 =Observable.just(1,2,3);
  Observable<String> obs2 =Observable.just("a","b","c");
  Observable.combineLatest(obs1, obs2, new Func2<Integer, String, String>() {
    @Override
    public String call(Integer integer, String s) {
      return integer+s;
    }
  }).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
      Log.d(TAG, "combineLastest:"+s);
    }
  });
}

代码示例来源:origin: xiaolongonly/Ticket-Analysis

public static Subscription getLimitEnableStateSubscription(View enableView, FuncN<Boolean> funcN, EditText... editTexts) {
    List<Observable<CharSequence>> observableList = new ArrayList<>();
    for (EditText editText : editTexts) {
      observableList.add(RxTextView.textChanges(editText));
    }

    return Observable.combineLatest(observableList, funcN)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(isEnable -> enableView.setEnabled(isEnable), throwable -> enableView.setEnabled(false));
  }
}

代码示例来源:origin: com.netflix.eureka/eureka2-server

public SelfInfoResolverChain(ChainableSelfInfoResolver... resolvers) {
  super(Observable.combineLatest(getObservableList(resolvers), new FuncN<InstanceInfo.Builder>() {
        @Override
        public InstanceInfo.Builder call(Object... args) {
          InstanceInfo.Builder seed = new InstanceInfo.Builder();
          for (Object obj : args) {
            InstanceInfo.Builder builder = (InstanceInfo.Builder) obj;
            seed.withBuilder(new InstanceInfo.Builder().withBuilder(builder));  // clone at each step
          }
          return seed;
        }
      })
  );
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Combines two source Observables by emitting an item that aggregates the latest values of each of the
 * source Observables each time an item is received from either of the source Observables, where this
 * aggregation is defined by a specified function.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/combineLatest.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param o1
 *            the first source Observable
 * @param o2
 *            the second source Observable
 * @param combineFunction
 *            the aggregation function used to combine the items emitted by the source Observables
 * @return an Observable that emits items that are the result of combining the items emitted by the source
 *         Observables by means of the given aggregation function
 * @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#combinelatest">RxJava wiki: combineLatest</a>
 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211991.aspx">MSDN: Observable.CombineLatest</a>
 */
@SuppressWarnings("unchecked")
public static final <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
  return combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
}

代码示例来源:origin: com.netflix.ocelli/ocelli-core

/**
 * Given a list of observables that emit a boolean condition AND all conditions whenever
 * any condition changes and emit the resulting condition when the final condition changes.
 * @param sources
 * @return
 */
public static Observable<Boolean> conditionAnder(List<Observable<Boolean>> sources) {
  return Observable.combineLatest(sources, new FuncN<Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(Object... args) {
      return Observable.from(args).cast(Boolean.class).firstOrDefault(true, new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean status) {
          return !status;
        }
      });
    }
  })
  .flatMap(new Func1<Observable<Boolean>, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(Observable<Boolean> t1) {
      return t1;
    }
  })
  .distinctUntilChanged();
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

public static final <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
    Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
  return combineLatest(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(combineFunction));

代码示例来源:origin: com.netflix.rxjava/rxjava-core

/**
 * Combines three source Observables by emitting an item that aggregates the latest values of each of the
 * source Observables each time an item is received from any of the source Observables, where this
 * aggregation is defined by a specified function.
 * <p>
 * <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/combineLatest.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param o1
 *            the first source Observable
 * @param o2
 *            the second source Observable
 * @param o3
 *            the third source Observable
 * @param combineFunction
 *            the aggregation function used to combine the items emitted by the source Observables
 * @return an Observable that emits items that are the result of combining the items emitted by the source
 *         Observables by means of the given aggregation function
 * @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#combinelatest">RxJava wiki: combineLatest</a>
 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211991.aspx">MSDN: Observable.CombineLatest</a>
 */
@SuppressWarnings("unchecked")
public static final <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
  return combineLatest(Arrays.asList(o1, o2, o3), Functions.fromFunc(combineFunction));
}

代码示例来源:origin: chiclaim/AndroidRxJavaSample

private Observable<User> fetchFriendsInfo(User user) {
  Observable<User> observableUser = Observable.just(user);
  Observable<List<User>> observableUsers = Observable
      .from(user.getFriends())
      .flatMap(new Func1<User, Observable<User>>() {
        @Override
        public Observable<User> call(User user) {
          return userApi.fetchUserInfo(user.getId() + "");
        }
      })
      .toList();
  return Observable.combineLatest(observableUser, observableUsers, new Func2<User, List<User>, User>() {
    @Override
    public User call(User user, List<User> users) {
      user.setFriends(users);
      return user;
    }
  });
}

代码示例来源:origin: marcoRS/rxjava-essentials

private void loadList(List<AppInfo> apps) {
 mRecyclerView.setVisibility(View.VISIBLE);
 Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
   .map(position -> apps.get(position.intValue()));
 Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
 Observable.combineLatest(appsSequence, tictoc, this::updateTitle)
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(new Observer<AppInfo>() {
    @Override public void onCompleted() {
     Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
    }
    @Override public void onError(Throwable e) {
     mSwipeRefreshLayout.setRefreshing(false);
     Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
    }
    @Override public void onNext(AppInfo appInfo) {
     if (mSwipeRefreshLayout.isRefreshing()) {
      mSwipeRefreshLayout.setRefreshing(false);
     }
     mAddedApps.add(appInfo);
     int position = mAddedApps.size() - 1;
     mAdapter.addApplication(position, appInfo);
     mRecyclerView.smoothScrollToPosition(position);
    }
   });
}

代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient

public void sendPingWhenConnected() {
  Observable.combineLatest(
      Observable.interval(5, TimeUnit.SECONDS, scheduler),
      connectedAndRegistered,
      new Func2<Long, RxObjectEventConn, RxObjectEventConn>() {
        @Override
        public RxObjectEventConn call(Long aLong, RxObjectEventConn rxEventConn) {
          return rxEventConn;
        }
      })
      .compose(isConnected())
      .flatMap(new Func1<RxObjectEventConn, Observable<?>>() {
        @Override
        public Observable<?> call(RxObjectEventConn rxEventConn) {
          return RxMoreObservables.sendObjectMessage(rxEventConn.sender(), new PingMessage("send_only_when_connected"))
              .toObservable();
        }
      })
      .subscribe();
}

代码示例来源:origin: Q42/RxPromise

/**
 * Given an {@link Iterable} of promises, return a promise that is fulfilled when all the items in the {@link Iterable} are fulfilled.
 * The promise's fulfillment value is a {@link List} with fulfillment values in the original order.
 * If any promise rejects, the returned promise is rejected immediately with the rejection reason.
 *
 * @param promises The {@link List} of promises
 * @return A promise that combines all values of given promises into a {@link List}
 */
@SuppressWarnings("unchecked")
public static <T> Promise<List<T>> all(Iterable<Promise<T>> promises) {
  List<Observable<T>> observables = coerceToList(promises, Promise.<T>coerceToObservable());
  if (observables.isEmpty()) {
    return just(Collections.<T>emptyList());
  }
  return from(combineLatest(observables, new FuncN<List<T>>() {
    @Override
    public List<T> call(Object... args) {
      return asList((T[]) args);
    }
  }));
}

代码示例来源:origin: TangoAgency/android-data-binding-rxjava

public MainViewModel() {
  Observable.combineLatest(toObservable(firstName), toObservable(lastName), (firstName, lastName) -> StringUtils.isNotNullOrEmpty(firstName) && StringUtils.isNotNullOrEmpty(lastName))
      .subscribe(result -> {
        helloButtonEnabled.set(result);
        if (!result) {
          helloText.set(StringUtils.EMPTY);
        }
      }, Throwable::printStackTrace);
}

代码示例来源:origin: SmartDengg/RxBlur

@Override public void setListener() {
 Observable<TextViewTextChangeEvent> radiusObservable =
   RxTextView.textChangeEvents(radiusEt).skip(1);
 Observable<TextViewTextChangeEvent> durationObservable =
   RxTextView.textChangeEvents(durationEt).skip(1);
 Observable.combineLatest(radiusObservable, durationObservable,
   new Func2<TextViewTextChangeEvent, TextViewTextChangeEvent, Boolean>() {
    @Override public Boolean call(TextViewTextChangeEvent radiusEvent,
      TextViewTextChangeEvent durationEvent) {
     radius = radiusEvent.text().toString();
     duration = durationEvent.text().toString();
     return !TextUtils.isEmpty(radius) && !TextUtils.isEmpty(duration);
    }
   })
   .debounce(300, TimeUnit.MILLISECONDS)
   .compose(RxAnimatorBlurActivity.this.<Boolean>bindUntilEvent(ActivityEvent.DESTROY))
   .observeOn(AndroidSchedulers.mainThread())
   .startWith(false)
   .subscribe(new Action1<Boolean>() {
    @Override public void call(Boolean aBoolean) {
     blurBtn.setEnabled(aBoolean);
    }
   });
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_345() throws Exception {
  Observable.combineLatest(
      interval(17, MILLISECONDS).map(x -> "S" + x),
      interval(10, MILLISECONDS).map(x -> "F" + x),
      (s, f) -> f + ":" + s
  ).forEach(System.out::println);
  Sleeper.sleep(Duration.ofSeconds(2));
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_121() throws Exception {
  EditText latText = null;//...
  EditText lonText = null;//...
  Observable<Double> latChanges = RxTextView
      .afterTextChangeEvents(latText)
      .flatMap(toDouble());
  Observable<Double> lonChanges = RxTextView
      .afterTextChangeEvents(lonText)
      .flatMap(toDouble());
  Observable<Cities> cities = Observable
      .combineLatest(latChanges, lonChanges, toPair())
      .debounce(1, TimeUnit.SECONDS)
      .flatMap(listCitiesNear());
}

相关文章

Observable类方法