io.reactivex.Observable.distinctUntilChanged()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(96)

本文整理了Java中io.reactivex.Observable.distinctUntilChanged()方法的一些代码示例,展示了Observable.distinctUntilChanged()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.distinctUntilChanged()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:distinctUntilChanged

Observable.distinctUntilChanged介绍

[英]Returns an Observable that emits all items emitted by the source ObservableSource that are distinct from their immediate predecessors based on Object#equals(Object) comparison.

It is recommended the elements' class T in the flow overrides the default Object.equals() to provide meaningful comparison between items as the default Java implementation only considers reference equivalence. Alternatively, use the #distinctUntilChanged(BiPredicate) overload and provide a comparison function in case the class T can't be overridden with custom equals() or the comparison itself should happen on different terms or properties of the class T.

Note that the operator always retains the latest item from upstream regardless of the comparison result and uses it in the next comparison with the next upstream item. Scheduler: distinctUntilChanged does not operate by default on a particular Scheduler.
[中]返回一个Observable,它根据Object#equals(Object)比较,发出源ObservableSource发出的与前一个不同的所有项。
建议流中元素的类T覆盖默认对象。equals()提供项之间有意义的比较,因为默认Java实现只考虑引用等价性。或者,使用#distinctUntilChanged(BiPredicate)重载,并提供一个比较函数,以防无法使用自定义equals()重写类T,或者比较本身应该发生在类T的不同项或属性上。
请注意,无论比较结果如何,操作员始终保留来自上游的最新项目,并在与下一个上游项目的下一次比较中使用它。调度程序:默认情况下,distinctUntilChanged不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctUntilChangedFunctionNull() {
  just1.distinctUntilChanged((Function<Object, Object>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctUntilChangedBiPredicateNull() {
  just1.distinctUntilChanged((BiPredicate<Object, Object>)null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
  Observable<String> src = Observable.just("a", "b", null, "c");
  src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  verify(w, times(1)).onError(any(NullPointerException.class));
  inOrder.verify(w, never()).onNext(anyString());
  inOrder.verify(w, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctUntilChangedOfNone() {
  Observable<String> src = Observable.empty();
  src.distinctUntilChanged().subscribe(w);
  verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctUntilChangedOfNoneWithKeySelector() {
  Observable<String> src = Observable.empty();
  src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctUntilChangedOfNormalSourceWithKeySelector() {
  Observable<String> src = Observable.just("a", "b", "c", "C", "c", "B", "b", "a", "e");
  src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onNext("c");
  inOrder.verify(w, times(1)).onNext("B");
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("e");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctUntilChangedOfNormalSource() {
  Observable<String> src = Observable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
  src.distinctUntilChanged().subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onNext("c");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("e");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinctUntilChanged() {
  return distinctUntilChanged(Functions.identity());

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctUntilChangedOfSourceWithNulls() {
  Observable<String> src = Observable.just(null, "a", "a", null, null, "b", null, null);
  src.distinctUntilChanged().subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext(null);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext(null);
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onNext(null);
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

private void setUpSearchObservable() {
  RxSearchObservable.fromView(searchView)
      .debounce(300, TimeUnit.MILLISECONDS)
      .filter(new Predicate<String>() {
        @Override
        public boolean test(String text) {
          if (text.isEmpty()) {
            textViewResult.setText("");
            return false;
          } else {
            return true;
          }
        }
      })
      .distinctUntilChanged()
      .switchMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String query) {
          return dataFromNetwork(query);
        }
      })
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<String>() {
        @Override
        public void accept(String result) {
          textViewResult.setText(result);
        }
      });
}

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Connectivity> observeNetworkConnectivity(final Context context) {
 final String service = Context.CONNECTIVITY_SERVICE;
 final ConnectivityManager manager = (ConnectivityManager) context.getSystemService(service);
 return Observable.create(new ObservableOnSubscribe<Connectivity>() {
  @Override public void subscribe(ObservableEmitter<Connectivity> subscriber) throws Exception {
   networkCallback = createNetworkCallback(subscriber, context);
   final NetworkRequest networkRequest = new NetworkRequest.Builder().build();
   manager.registerNetworkCallback(networkRequest, networkCallback);
  }
 }).doOnDispose(new Action() {
  @Override public void run() {
   tryToUnregisterCallback(manager);
  }
 }).startWith(Connectivity.create(context)).distinctUntilChanged();
}

代码示例来源:origin: redisson/redisson

/**
 * Returns an Observable that emits all items emitted by the source ObservableSource that are distinct from their
 * immediate predecessors based on {@link Object#equals(Object)} comparison.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="">
 * <p>
 * It is recommended the elements' class {@code T} in the flow overrides the default {@code Object.equals()} to provide
 * meaningful comparison between items as the default Java implementation only considers reference equivalence.
 * Alternatively, use the {@link #distinctUntilChanged(BiPredicate)} overload and provide a comparison function
 * in case the class {@code T} can't be overridden with custom {@code equals()} or the comparison itself
 * should happen on different terms or properties of the class {@code T}.
 * <p>
 * Note that the operator always retains the latest item from upstream regardless of the comparison result
 * and uses it in the next comparison with the next upstream item.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @return an Observable that emits those items from the source ObservableSource that are distinct from their
 *         immediate predecessors
 * @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
 * @see #distinctUntilChanged(BiPredicate)
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinctUntilChanged() {
  return distinctUntilChanged(Functions.identity());
}

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Boolean> observeInternetConnectivity(final int initialIntervalInMs,
  final int intervalInMs, final String host, final int port, final int timeoutInMs,
  final int httpResponse, final ErrorHandler errorHandler) {
 Preconditions.checkGreaterOrEqualToZero(initialIntervalInMs,
   "initialIntervalInMs is not a positive number");
 Preconditions.checkGreaterThanZero(intervalInMs, "intervalInMs is not a positive number");
 checkGeneralPreconditions(host, port, timeoutInMs, errorHandler);
 final String adjustedHost = adjustHost(host);
 return Observable.interval(initialIntervalInMs, intervalInMs, TimeUnit.MILLISECONDS,
   Schedulers.io()).map(new Function<Long, Boolean>() {
  @Override public Boolean apply(@NonNull Long tick) throws Exception {
   return isConnected(adjustedHost, port, timeoutInMs, errorHandler);
  }
 }).distinctUntilChanged();
}

代码示例来源:origin: pwittchen/ReactiveNetwork

@Override public Observable<Boolean> observeInternetConnectivity(final int initialIntervalInMs,
  final int intervalInMs, final String host, final int port, final int timeoutInMs,
  final int httpResponse,
  final ErrorHandler errorHandler) {
 Preconditions.checkGreaterOrEqualToZero(initialIntervalInMs,
   "initialIntervalInMs is not a positive number");
 Preconditions.checkGreaterThanZero(intervalInMs, "intervalInMs is not a positive number");
 checkGeneralPreconditions(host, port, timeoutInMs, httpResponse, errorHandler);
 final String adjustedHost = adjustHost(host);
 return Observable.interval(initialIntervalInMs, intervalInMs, TimeUnit.MILLISECONDS,
   Schedulers.io()).map(new Function<Long, Boolean>() {
  @Override public Boolean apply(@NonNull Long tick) {
   return isConnected(adjustedHost, port, timeoutInMs, httpResponse, errorHandler);
  }
 }).distinctUntilChanged();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void customComparator() {
  Observable<String> source = Observable.just("a", "b", "B", "A", "a", "C");
  TestObserver<String> to = TestObserver.create();
  source.distinctUntilChanged(new BiPredicate<String, String>() {
    @Override
    public boolean test(String a, String b) {
      return a.compareToIgnoreCase(b) == 0;
    }
  })
  .subscribe(to);
  to.assertValues("a", "b", "A", "C");
  to.assertNoErrors();
  to.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void customComparatorThrows() {
  Observable<String> source = Observable.just("a", "b", "B", "A", "a", "C");
  TestObserver<String> to = TestObserver.create();
  source.distinctUntilChanged(new BiPredicate<String, String>() {
    @Override
    public boolean test(String a, String b) {
      throw new TestException();
    }
  })
  .subscribe(to);
  to.assertValue("a");
  to.assertNotComplete();
  to.assertError(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void distinctUntilChangedFunctionReturnsNull() {
  Observable.range(1, 2).distinctUntilChanged(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return null;
    }
  }).test().assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fused() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Observable.just(1, 2, 2, 3, 3, 4, 5)
  .distinctUntilChanged(new BiPredicate<Integer, Integer>() {
    @Override
    public boolean test(Integer a, Integer b) throws Exception {
      return a.equals(b);
    }
  })
  .subscribe(to);
  to.assertOf(ObserverFusion.<Integer>assertFuseable())
  .assertOf(ObserverFusion.<Integer>assertFusionMode(QueueFuseable.SYNC))
  .assertResult(1, 2, 3, 4, 5)
  ;
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
  protected void subscribeActual(Observer<? super RxBleClient.State> observer) {
    if (!rxBleAdapterWrapper.hasBluetoothAdapter()) {
      observer.onSubscribe(Disposables.empty());
      observer.onComplete();
      return;
    }

    checkPermissionUntilGranted(locationServicesStatus, timerScheduler)
        .flatMapObservable(new Function<Boolean, Observable<RxBleClient.State>>() {
          @Override
          public Observable<RxBleClient.State> apply(Boolean permissionWasInitiallyGranted) {
            return checkAdapterAndServicesState(
                permissionWasInitiallyGranted,
                rxBleAdapterWrapper,
                bleAdapterStateObservable,
                locationServicesOkObservable
            );
          }
        })
        .distinctUntilChanged()
        .subscribe(observer);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void ignoreCancel() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Observable.wrap(new ObservableSource<Integer>() {
      @Override
      public void subscribe(Observer<? super Integer> observer) {
        observer.onSubscribe(Disposables.empty());
        observer.onNext(1);
        observer.onNext(2);
        observer.onNext(3);
        observer.onError(new IOException());
        observer.onComplete();
      }
    })
    .distinctUntilChanged(new BiPredicate<Integer, Integer>() {
      @Override
      public boolean test(Integer a, Integer b) throws Exception {
        throw new TestException();
      }
    })
    .test()
    .assertFailure(TestException.class, 1);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

相关文章

Observable类方法