io.reactivex.Observable.distinct()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.2k)|赞(0)|评价(0)|浏览(123)

本文整理了Java中io.reactivex.Observable.distinct()方法的一些代码示例,展示了Observable.distinct()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.distinct()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:distinct

Observable.distinct介绍

[英]Returns an Observable that emits all items emitted by the source ObservableSource that are distinct based on Object#equals(Object) comparison.

It is recommended the elements' class T in the flow overrides the default Object.equals()and Object#hashCode() to provide meaningful comparison between items as the default Java implementation only considers reference equivalence.

By default, distinct() uses an internal java.util.HashSet per Observer to remember previously seen items and uses java.util.Set#add(Object) returning false as the indicator for duplicates.

Note that this internal HashSet may grow unbounded as items won't be removed from it by the operator. Therefore, using very long or infinite upstream (with very distinct elements) may lead to OutOfMemoryError.

Customizing the retention policy can happen only by providing a custom java.util.Collection implementation to the #distinct(Function,Callable) overload. Scheduler: distinct does not operate by default on a particular Scheduler.
[中]返回一个Observable,它将根据对象#equals(Object)比较,发出源ObservableSource发出的所有不同项。
建议流中元素的类T覆盖默认对象。equals()和Object#hashCode()提供项之间有意义的比较,因为默认Java实现只考虑引用等价性。
默认情况下,distinct()使用内部java。util。HashSet per Observer可以记住以前看到的项目,并使用java。util。将#add(Object)returning false设置为重复项的指示符。
请注意,这个内部哈希集可能会无限增长,因为操作符不会从中删除项。因此,使用非常长或无限长的上游(具有非常独特的元素)可能会导致OutOfMemory错误。
定制保留策略只能通过提供一个自定义java来实现。util。集合实现到#distinct(函数,可调用)重载。调度程序:默认情况下,distinct不会在特定调度程序上运行。

代码示例

代码示例来源:origin: nanchen2251/RxJava2Examples

@Override
  protected void doSomething() {
    Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
        .distinct()
        .subscribe(new Consumer<Integer>() {
          @Override
          public void accept(@NonNull Integer integer) throws Exception {
            mRxOperatorsText.append("distinct : " + integer + "\n");
            Log.e(TAG, "distinct : " + integer + "\n");
          }
        });
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctSupplierNull() {
  just1.distinct(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return v;
    }
  }, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctFunctionNull() {
  just1.distinct(null);
}

代码示例来源:origin: amitshekhariitbhu/RxJava2-Android-Samples

private void doSomeWork() {
  getObservable()
      .distinct()
      .subscribe(getObserver());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
  Observable<String> src = Observable.just("a", "b", null, "c");
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
  inOrder.verify(w, never()).onNext(anyString());
  inOrder.verify(w, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedClear() {
  Observable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
  .distinct()
  .subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
      QueueDisposable<?> qd = (QueueDisposable<?>)d;
      assertFalse(qd.isEmpty());
      qd.clear();
      assertTrue(qd.isEmpty());
    }
    @Override
    public void onNext(Integer value) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNormalSource() {
  Observable<String> src = Observable.just("a", "b", "c", "c", "c", "b", "b", "a", "e");
  src.distinct().subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onNext("c");
  inOrder.verify(w, times(1)).onNext("e");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNone() {
  Observable<String> src = Observable.empty();
  src.distinct().subscribe(w);
  verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNoneWithKeySelector() {
  Observable<String> src = Observable.empty();
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testDistinctOfNormalSourceWithKeySelector() {
  Observable<String> src = Observable.just("a", "B", "c", "C", "c", "B", "b", "a", "E");
  src.distinct(TO_UPPER_WITH_EXCEPTION).subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("B");
  inOrder.verify(w, times(1)).onNext("c");
  inOrder.verify(w, times(1)).onNext("E");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("Null values no longer allowed")
public void testDistinctOfSourceWithNulls() {
  Observable<String> src = Observable.just(null, "a", "a", null, null, "b", null);
  src.distinct().subscribe(w);
  InOrder inOrder = inOrder(w);
  inOrder.verify(w, times(1)).onNext(null);
  inOrder.verify(w, times(1)).onNext("a");
  inOrder.verify(w, times(1)).onNext("b");
  inOrder.verify(w, times(1)).onComplete();
  inOrder.verify(w, never()).onNext(anyString());
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctFunctionReturnsNull() {
  just1.distinct(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Observable<T> distinct(Function<? super T, K> keySelector) {
  return distinct(keySelector, Functions.createHashSet());

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void distinctSupplierReturnsNull() {
  just1.distinct(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer v) {
      return v;
    }
  }, new Callable<Collection<Object>>() {
    @Override
    public Collection<Object> call() {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusedSync() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Observable.just(1, 1, 2, 1, 3, 2, 4, 5, 4)
  .distinct()
  .subscribe(to);
  ObserverFusion.assertFusion(to, QueueFuseable.SYNC)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinct() {
  return distinct(Functions.identity(), Functions.createHashSet());

代码示例来源:origin: ReactiveX/RxJava

@Test
public void collectionSupplierThrows() {
  Observable.just(1)
  .distinct(Functions.identity(), new Callable<Collection<Object>>() {
    @Override
    public Collection<Object> call() throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void collectionSupplierIsNull() {
  Observable.just(1)
  .distinct(Functions.identity(), new Callable<Collection<Object>>() {
    @Override
    public Collection<Object> call() throws Exception {
      return null;
    }
  })
  .test()
  .assertFailure(NullPointerException.class)
  .assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
}

代码示例来源:origin: ReactiveX/RxJava

/** Issue #2587. */
@Test
public void testRepeatAndDistinctUnbounded() {
  Observable<Integer> src = Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
      .take(3)
      .repeat(3)
      .distinct();
  TestObserver<Integer> to = new TestObserver<Integer>();
  src.subscribe(to);
  to.assertNoErrors();
  to.assertTerminated();
  to.assertValues(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Observable.error(new TestException())
  .distinct()
  .test()
  .assertFailure(TestException.class);
}

相关文章

Observable类方法