本文整理了Java中rx.Observable.combineLatest()
方法的一些代码示例,展示了Observable.combineLatest()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.combineLatest()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:combineLatest
[英]Combines a list of source Observables by emitting an item that aggregates the latest values of each of the source Observables each time an item is received from any of the source Observables, where this aggregation is defined by a specified function. Scheduler: combineLatest does not operate by default on a particular Scheduler.
[中]通过发出一个项来组合源可观测值列表,该项在每次从任何源可观测值接收到一个项时聚合每个源可观测值的最新值,其中该聚合由指定函数定义。调度器:默认情况下,CombineTest不会在特定的调度器上运行。
代码示例来源:origin: konmik/nucleus
@Override
public Observable<Delivery<View, T>> call(Observable<T> observable) {
return Observable
.combineLatest(
view,
observable
.materialize()
.filter(new Func1<Notification<T>, Boolean>() {
@Override
public Boolean call(Notification<T> notification) {
return !notification.isOnCompleted();
}
}),
new Func2<View, Notification<T>, Delivery<View, T>>() {
@Override
public Delivery<View, T> call(View view, Notification<T> notification) {
return view == null ? null : new Delivery<>(view, notification);
}
})
.filter(new Func1<Delivery<View, T>, Boolean>() {
@Override
public Boolean call(Delivery<View, T> delivery) {
return delivery != null;
}
});
}
}
代码示例来源:origin: BaronZ88/MinimalistWeather
Observable<EnvironmentCloudCityAirLive> airLiveObservable = ApiClient.environmentCloudWeatherService.getAirLive(cityId);
observableForGetWeatherFromNetWork = Observable.combineLatest(weatherLiveObservable, forecastObservable, airLiveObservable,
(weatherLive, forecast, airLive) -> new CloudWeatherAdapter(weatherLive, forecast, airLive).getWeather());
代码示例来源:origin: THEONE10211024/RxJavaSamples
private void _combineLatestEvents() {
_subscription = Observable.combineLatest(_emailChangeObservable,
_passwordChangeObservable,
_numberChangeObservable,
代码示例来源:origin: ribot/ribot-app-android
public void loadVenues() {
getMvpView().showVenuesProgress(true);
mSubscriptions.add(Observable.combineLatest(
getTodayLatestCheckInAtVenue().defaultIfEmpty(null), mDataManager.getVenues(),
new Func2<CheckIn, List<Venue>, VenuesInfo>() {
@Override
public VenuesInfo call(CheckIn checkIn, List<Venue> venues) {
return new VenuesInfo(checkIn, venues);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<VenuesInfo>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Timber.e("Error loading venues " + e);
getMvpView().showVenuesProgress(false);
}
@Override
public void onNext(VenuesInfo venuesInfo) {
getMvpView().showVenues(
venuesInfo.listVenues,
venuesInfo.getTodayLatestCheckInAtVenueId());
getMvpView().showVenuesProgress(false);
}
}));
}
代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient
@Nonnull
public Observable<ItemsWithScroll> itemsWithScrollObservable() {
return Observable.combineLatest(items, lastItemInView, new Func2<ImmutableList<MainPresenter.AdapterItem>, Boolean, ItemsWithScroll>() {
@Override
public ItemsWithScroll call(ImmutableList<MainPresenter.AdapterItem> adapterItems, Boolean isLastItemInList) {
final int lastItemPosition = adapterItems.size() - 1;
final boolean shouldScroll = isLastItemInList && lastItemPosition >= 0;
return new ItemsWithScroll(adapterItems, shouldScroll, lastItemPosition);
}
});
}
代码示例来源:origin: henrymorgen/android-advanced-light
private void combineLastest() {
Observable<Integer> obs1 =Observable.just(1,2,3);
Observable<String> obs2 =Observable.just("a","b","c");
Observable.combineLatest(obs1, obs2, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer+s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "combineLastest:"+s);
}
});
}
代码示例来源:origin: xiaolongonly/Ticket-Analysis
public static Subscription getLimitEnableStateSubscription(View enableView, FuncN<Boolean> funcN, EditText... editTexts) {
List<Observable<CharSequence>> observableList = new ArrayList<>();
for (EditText editText : editTexts) {
observableList.add(RxTextView.textChanges(editText));
}
return Observable.combineLatest(observableList, funcN)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(isEnable -> enableView.setEnabled(isEnable), throwable -> enableView.setEnabled(false));
}
}
代码示例来源:origin: com.netflix.eureka/eureka2-server
public SelfInfoResolverChain(ChainableSelfInfoResolver... resolvers) {
super(Observable.combineLatest(getObservableList(resolvers), new FuncN<InstanceInfo.Builder>() {
@Override
public InstanceInfo.Builder call(Object... args) {
InstanceInfo.Builder seed = new InstanceInfo.Builder();
for (Object obj : args) {
InstanceInfo.Builder builder = (InstanceInfo.Builder) obj;
seed.withBuilder(new InstanceInfo.Builder().withBuilder(builder)); // clone at each step
}
return seed;
}
})
);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Combines two source Observables by emitting an item that aggregates the latest values of each of the
* source Observables each time an item is received from either of the source Observables, where this
* aggregation is defined by a specified function.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/combineLatest.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param o1
* the first source Observable
* @param o2
* the second source Observable
* @param combineFunction
* the aggregation function used to combine the items emitted by the source Observables
* @return an Observable that emits items that are the result of combining the items emitted by the source
* Observables by means of the given aggregation function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#combinelatest">RxJava wiki: combineLatest</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211991.aspx">MSDN: Observable.CombineLatest</a>
*/
@SuppressWarnings("unchecked")
public static final <T1, T2, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return combineLatest(Arrays.asList(o1, o2), Functions.fromFunc(combineFunction));
}
代码示例来源:origin: com.netflix.ocelli/ocelli-core
/**
* Given a list of observables that emit a boolean condition AND all conditions whenever
* any condition changes and emit the resulting condition when the final condition changes.
* @param sources
* @return
*/
public static Observable<Boolean> conditionAnder(List<Observable<Boolean>> sources) {
return Observable.combineLatest(sources, new FuncN<Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Object... args) {
return Observable.from(args).cast(Boolean.class).firstOrDefault(true, new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean status) {
return !status;
}
});
}
})
.flatMap(new Func1<Observable<Boolean>, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Observable<Boolean> t1) {
return t1;
}
})
.distinctUntilChanged();
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
public static final <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4,
Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
return combineLatest(Arrays.asList(o1, o2, o3, o4), Functions.fromFunc(combineFunction));
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Combines three source Observables by emitting an item that aggregates the latest values of each of the
* source Observables each time an item is received from any of the source Observables, where this
* aggregation is defined by a specified function.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/combineLatest.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param o1
* the first source Observable
* @param o2
* the second source Observable
* @param o3
* the third source Observable
* @param combineFunction
* the aggregation function used to combine the items emitted by the source Observables
* @return an Observable that emits items that are the result of combining the items emitted by the source
* Observables by means of the given aggregation function
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#combinelatest">RxJava wiki: combineLatest</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211991.aspx">MSDN: Observable.CombineLatest</a>
*/
@SuppressWarnings("unchecked")
public static final <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return combineLatest(Arrays.asList(o1, o2, o3), Functions.fromFunc(combineFunction));
}
代码示例来源:origin: chiclaim/AndroidRxJavaSample
private Observable<User> fetchFriendsInfo(User user) {
Observable<User> observableUser = Observable.just(user);
Observable<List<User>> observableUsers = Observable
.from(user.getFriends())
.flatMap(new Func1<User, Observable<User>>() {
@Override
public Observable<User> call(User user) {
return userApi.fetchUserInfo(user.getId() + "");
}
})
.toList();
return Observable.combineLatest(observableUser, observableUsers, new Func2<User, List<User>, User>() {
@Override
public User call(User user, List<User> users) {
user.setFriends(users);
return user;
}
});
}
代码示例来源:origin: marcoRS/rxjava-essentials
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position -> apps.get(position.intValue()));
Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
Observable.combineLatest(appsSequence, tictoc, this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override public void onNext(AppInfo appInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
代码示例来源:origin: jacek-marchwicki/JavaWebsocketClient
public void sendPingWhenConnected() {
Observable.combineLatest(
Observable.interval(5, TimeUnit.SECONDS, scheduler),
connectedAndRegistered,
new Func2<Long, RxObjectEventConn, RxObjectEventConn>() {
@Override
public RxObjectEventConn call(Long aLong, RxObjectEventConn rxEventConn) {
return rxEventConn;
}
})
.compose(isConnected())
.flatMap(new Func1<RxObjectEventConn, Observable<?>>() {
@Override
public Observable<?> call(RxObjectEventConn rxEventConn) {
return RxMoreObservables.sendObjectMessage(rxEventConn.sender(), new PingMessage("send_only_when_connected"))
.toObservable();
}
})
.subscribe();
}
代码示例来源:origin: Q42/RxPromise
/**
* Given an {@link Iterable} of promises, return a promise that is fulfilled when all the items in the {@link Iterable} are fulfilled.
* The promise's fulfillment value is a {@link List} with fulfillment values in the original order.
* If any promise rejects, the returned promise is rejected immediately with the rejection reason.
*
* @param promises The {@link List} of promises
* @return A promise that combines all values of given promises into a {@link List}
*/
@SuppressWarnings("unchecked")
public static <T> Promise<List<T>> all(Iterable<Promise<T>> promises) {
List<Observable<T>> observables = coerceToList(promises, Promise.<T>coerceToObservable());
if (observables.isEmpty()) {
return just(Collections.<T>emptyList());
}
return from(combineLatest(observables, new FuncN<List<T>>() {
@Override
public List<T> call(Object... args) {
return asList((T[]) args);
}
}));
}
代码示例来源:origin: TangoAgency/android-data-binding-rxjava
public MainViewModel() {
Observable.combineLatest(toObservable(firstName), toObservable(lastName), (firstName, lastName) -> StringUtils.isNotNullOrEmpty(firstName) && StringUtils.isNotNullOrEmpty(lastName))
.subscribe(result -> {
helloButtonEnabled.set(result);
if (!result) {
helloText.set(StringUtils.EMPTY);
}
}, Throwable::printStackTrace);
}
代码示例来源:origin: SmartDengg/RxBlur
@Override public void setListener() {
Observable<TextViewTextChangeEvent> radiusObservable =
RxTextView.textChangeEvents(radiusEt).skip(1);
Observable<TextViewTextChangeEvent> durationObservable =
RxTextView.textChangeEvents(durationEt).skip(1);
Observable.combineLatest(radiusObservable, durationObservable,
new Func2<TextViewTextChangeEvent, TextViewTextChangeEvent, Boolean>() {
@Override public Boolean call(TextViewTextChangeEvent radiusEvent,
TextViewTextChangeEvent durationEvent) {
radius = radiusEvent.text().toString();
duration = durationEvent.text().toString();
return !TextUtils.isEmpty(radius) && !TextUtils.isEmpty(duration);
}
})
.debounce(300, TimeUnit.MILLISECONDS)
.compose(RxAnimatorBlurActivity.this.<Boolean>bindUntilEvent(ActivityEvent.DESTROY))
.observeOn(AndroidSchedulers.mainThread())
.startWith(false)
.subscribe(new Action1<Boolean>() {
@Override public void call(Boolean aBoolean) {
blurBtn.setEnabled(aBoolean);
}
});
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_345() throws Exception {
Observable.combineLatest(
interval(17, MILLISECONDS).map(x -> "S" + x),
interval(10, MILLISECONDS).map(x -> "F" + x),
(s, f) -> f + ":" + s
).forEach(System.out::println);
Sleeper.sleep(Duration.ofSeconds(2));
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_121() throws Exception {
EditText latText = null;//...
EditText lonText = null;//...
Observable<Double> latChanges = RxTextView
.afterTextChangeEvents(latText)
.flatMap(toDouble());
Observable<Double> lonChanges = RxTextView
.afterTextChangeEvents(lonText)
.flatMap(toDouble());
Observable<Cities> cities = Observable
.combineLatest(latChanges, lonChanges, toPair())
.debounce(1, TimeUnit.SECONDS)
.flatMap(listCitiesNear());
}
内容来源于网络,如有侵权,请联系作者删除!