rx.Observable.switchMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中rx.Observable.switchMap()方法的一些代码示例,展示了Observable.switchMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.switchMap()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:switchMap

Observable.switchMap介绍

[英]Returns a new Observable by applying a function that you supply to each item emitted by the source Observable that returns an Observable, and then emitting the items emitted by the most recently emitted of these Observables.

Scheduler: switchMap does not operate by default on a particular Scheduler.
[中]通过对源可观测项发出的每个项应用一个函数返回一个新的可观测项,然后发射这些可观测项中最近发射的项。
调度器:switchMap默认情况下不会在特定的调度器上运行。

代码示例

代码示例来源:origin: konmik/nucleus

  1. .subscribe(subject);
  2. return view
  3. .switchMap(new Func1<View, Observable<Delivery<View, T>>>() {
  4. @Override
  5. public Observable<Delivery<View, T>> call(final View view) {

代码示例来源:origin: konmik/nucleus

  1. @Override
  2. public Observable<Delivery<View, T>> call(Observable<T> observable) {
  3. return observable.materialize()
  4. .take(1)
  5. .switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
  6. @Override
  7. public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
  8. return view.map(new Func1<View, Delivery<View, T>>() {
  9. @Override
  10. public Delivery<View, T> call(View view) {
  11. return view == null ? null : new Delivery<>(view, notification);
  12. }
  13. });
  14. }
  15. })
  16. .filter(new Func1<Delivery<View, T>, Boolean>() {
  17. @Override
  18. public Boolean call(Delivery<View, T> delivery) {
  19. return delivery != null;
  20. }
  21. })
  22. .take(1);
  23. }
  24. }

代码示例来源:origin: cn-ljb/rxjava_for_android

  1. .switchMap(new Func1<TextViewTextChangeEvent, Observable<List<String>>>() {
  2. @Override
  3. public Observable<List<String>> call(TextViewTextChangeEvent textViewTextChangeEvent) {

代码示例来源:origin: com.netflix.eureka/eureka2-core

  1. protected <T> Observable<T> submitForResult(final Callable<Observable<T>> task) {
  2. return Observable.create(new Observable.OnSubscribe<Observable<T>>() {
  3. @Override
  4. public void call(Subscriber<? super Observable<T>> subscriber) {
  5. addAndSchedule(new InvokerTaskWithResult<>(task, subscriber));
  6. }
  7. }).switchMap(new Func1<Observable<T>, Observable<? extends T>>() {
  8. @Override
  9. public Observable<? extends T> call(Observable<T> tObservable) {
  10. return tObservable;
  11. }
  12. });
  13. }

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

  1. @Override
  2. public Observable<Void> update(final InstanceInfo newInfo) {
  3. if (state.get() == STATE.Closed) {
  4. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  5. }
  6. return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
  7. @Override
  8. public Observable<Void> call(MessageConnection connection) {
  9. return connection.submit(new UpdateCopy(newInfo));
  10. }
  11. });
  12. }

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

  1. @Override
  2. public Observable<Void> register(final InstanceInfo instanceInfo) {
  3. if (state.get() == STATE.Closed) {
  4. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  5. }
  6. return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
  7. @Override
  8. public Observable<Void> call(MessageConnection connection) {
  9. return connection.submit(new RegisterCopy(instanceInfo));
  10. }
  11. });
  12. }

代码示例来源:origin: com.nytimes.android/store

  1. @Override
  2. public Observable<T> call(Observable<T> upstream) {
  3. return upstream.repeatWhen(events -> events.switchMap(aVoid -> source));
  4. }
  5. }

代码示例来源:origin: pakoito/RxComprehensions

  1. @Override
  2. public Observable<R> call(final A a) {
  3. return one.call(a).switchMap(new Func1<B, Observable<R>>() {
  4. @Override
  5. public Observable<R> call(final B b) {
  6. return two.call(a, b);
  7. }
  8. });
  9. }
  10. });

代码示例来源:origin: com.netflix.eureka2/eureka-write-server

  1. @Override
  2. public Observable<Void> unregister(final String instanceId) {
  3. if (state.get() == STATE.Closed) {
  4. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  5. }
  6. return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
  7. @Override
  8. public Observable<Void> call(MessageConnection connection) {
  9. return connection.submit(new UnregisterCopy(instanceId));
  10. }
  11. });
  12. }
  13. }

代码示例来源:origin: pakoito/RxComprehensions

  1. /**
  2. * Composes an {@link rx.Observable} from multiple creation functions chained by {@link Observable#switchMap(Func1)}.
  3. *
  4. * @return composed Observable
  5. */
  6. public static <A, R> Observable<R> doSwitchMap(
  7. final Func0<Observable<A>> zero,
  8. final Func1<A, Observable<R>> one) {
  9. return zero.call().switchMap(new Func1<A, Observable<R>>() {
  10. @Override
  11. public Observable<R> call(final A a) {
  12. return one.call(a);
  13. }
  14. });
  15. }

代码示例来源:origin: pakoito/RxComprehensions

  1. @Override
  2. public Observable<R> call(final A a) {
  3. return one.call(a).switchMap(new Func1<B, Observable<R>>() {
  4. @Override
  5. public Observable<R> call(final B b) {
  6. return two.call(a, b).switchMap(new Func1<C, Observable<R>>() {
  7. @Override
  8. public Observable<R> call(final C c) {
  9. return three.call(a, b, c);
  10. }
  11. });
  12. }
  13. });
  14. }
  15. });

代码示例来源:origin: com.netflix.eureka2/eureka-client

  1. @Override
  2. public Observable<Void> update(final InstanceInfo newInfo) {
  3. STATES currentState = state.get();
  4. switch (currentState) {
  5. case Idle:
  6. return Observable.error(INSTANCE_NOT_REGISTERED_EXCEPTION);
  7. case Registered:
  8. //TODO: Need to serialize register -> update -> unregister. With this code both they can be interleaved
  9. return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
  10. @Override
  11. public Observable<? extends Void> call(MessageConnection connection) {
  12. return connection.submitWithAck(new Update(newInfo));
  13. }
  14. });
  15. case Closed:
  16. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  17. default:
  18. return Observable.error(new IllegalStateException("Unrecognized channel state: " + currentState));
  19. }
  20. }

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

  1. @Override
  2. public Observable<Void> unregister(final String instanceId) {
  3. if (state.get() != STATE.Connected) {
  4. return invalidStateError();
  5. }
  6. return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
  7. @Override
  8. public Observable<Void> call(MessageConnection connection) {
  9. return sendOnConnection(connection, new UnregisterCopy(instanceId));
  10. }
  11. });
  12. }

代码示例来源:origin: com.netflix.eureka/eureka2-write-server

  1. @Override
  2. public Observable<Void> register(final InstanceInfo instanceInfo) {
  3. if (state.get() != STATE.Connected) {
  4. return invalidStateError();
  5. }
  6. return connect().switchMap(new Func1<MessageConnection, Observable<Void>>() {
  7. @Override
  8. public Observable<Void> call(MessageConnection connection) {
  9. return sendOnConnection(connection, new RegisterCopy(instanceInfo));
  10. }
  11. });
  12. }

代码示例来源:origin: com.netflix.eureka/eureka2-client

  1. @Override
  2. public Observable<Void> register(final InstanceInfo instanceInfo) {
  3. if (!moveToState(STATE.Idle, STATE.Registered) && state.get() != STATE.Registered) {
  4. STATE currentState = state.get();
  5. if (currentState == STATE.Closed) {
  6. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  7. } else {
  8. return Observable.error(new IllegalStateException(
  9. "Error advancing to state Registered from state " + currentState));
  10. }
  11. }
  12. return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
  13. @Override
  14. public Observable<? extends Void> call(MessageConnection connection) {
  15. return sendExpectAckOnConnection(connection, new Register(instanceInfo));
  16. }
  17. });
  18. }

代码示例来源:origin: com.netflix.eureka2/eureka-client

  1. @Override
  2. public Observable<Void> unregister() {
  3. if (!moveToState(STATES.Registered, STATES.Closed)) {
  4. STATES currentState = state.get();
  5. if (currentState == STATES.Idle) {
  6. return Observable.error(INSTANCE_NOT_REGISTERED_EXCEPTION);
  7. }
  8. if (currentState == STATES.Closed) {
  9. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  10. }
  11. return Observable.error(new IllegalStateException("Unrecognized channel state: " + currentState));
  12. }
  13. //TODO: Need to serialize register -> update -> unregister. With this code both they can be interleaved
  14. return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
  15. @Override
  16. public Observable<? extends Void> call(MessageConnection connection) {
  17. return connection.submitWithAck(Unregister.INSTANCE);
  18. }
  19. });
  20. }

代码示例来源:origin: com.netflix.eureka2/eureka-client

  1. @Override
  2. public Observable<Void> register(final InstanceInfo instanceInfo) {
  3. if (!moveToState(STATES.Idle, STATES.Registered)) {// State check. Only register if the state is Idle.
  4. STATES currentState = state.get();
  5. switch (currentState) {
  6. case Registered:
  7. return Observable.error(INSTANCE_ALREADY_REGISTERED_EXCEPTION);
  8. case Closed:
  9. return Observable.error(CHANNEL_CLOSED_EXCEPTION);
  10. }
  11. }
  12. //TODO: Need to serialize register -> update -> unregister. With this code both they can be interleaved
  13. return connect().switchMap(new Func1<MessageConnection, Observable<? extends Void>>() {
  14. @Override
  15. public Observable<? extends Void> call(MessageConnection connection) {
  16. return connection.submitWithAck(new Register(instanceInfo));
  17. }
  18. });
  19. }

代码示例来源:origin: patrick-doyle/android-rxmvp-tutorial

  1. private Subscription observeLookupButton() {
  2. return view.observeButton()
  3. .doOnNext(__ -> view.showLoading(true))
  4. .map(__ -> view.getUsernameEdit())
  5. .observeOn(Schedulers.io())
  6. .switchMap(username -> model.getUserReops(username))
  7. .observeOn(AndroidSchedulers.mainThread())
  8. .doOnNext(gitHubRepoList -> model.saveRepoListState(gitHubRepoList))
  9. .doOnEach(__ -> view.showLoading(false))
  10. .retry()
  11. .subscribe(gitHubRepoList -> {
  12. model.startRepoActivity(gitHubRepoList);
  13. });
  14. }
  15. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test() throws Exception {
  3. Observable<String> delay = Observable.just("")
  4. .switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
  5. .map( a -> String.valueOf(a) )
  6. .repeat();
  7. Observable<String> messages = Observable.just("Test") //eventually lines from a file...
  8. .repeat();
  9. messages.zipWith(delay, (d, msg) -> ""+d+" "+msg ).subscribe( System.out::println );
  10. Thread.sleep(10000);
  11. }
  12. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
  2. return f ->
  3. f.publish(g ->
  4. g.take(1)
  5. .concatWith(
  6. g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
  7. .take(1)
  8. .ignoreElements()
  9. )
  10. .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
  11. )
  12. ;
  13. }
  14. }

相关文章

Observable类方法