rx.Observable.distinctUntilChanged()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(141)

本文整理了Java中rx.Observable.distinctUntilChanged()方法的一些代码示例,展示了Observable.distinctUntilChanged()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.distinctUntilChanged()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:distinctUntilChanged

Observable.distinctUntilChanged介绍

[英]Returns an Observable that emits all items emitted by the source Observable that are distinct from their immediate predecessors.

Scheduler: distinctUntilChanged does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的与前一个Observable不同的所有项。
调度程序:默认情况下,distinctUntilChanged不会在特定调度程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

  1. private InnerIterator( int maxSize ) {
  2. queue = new ArrayBlockingQueue<>( maxSize );
  3. }

代码示例来源:origin: ReactiveX/RxNetty

  1. @Override
  2. public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
  3. return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() {
  4. @Override
  5. public HostHolder<W, R> call(HostConnector<W, R> connector) {
  6. HostHolder<W, R> newHolder = strategy.toHolder(connector);
  7. connector.subscribe(newHolder.getEventListener());
  8. return newHolder;
  9. }
  10. }).flatMap(new Func1<HostHolder<W, R>, Observable<HostUpdate<W, R>>>() {
  11. @Override
  12. public Observable<HostUpdate<W, R>> call(HostHolder<W, R> holder) {
  13. return holder.getConnector()
  14. .getHost()
  15. .getCloseNotifier()
  16. .map(new VoidToAnythingCast<HostUpdate<W, R>>())
  17. .ignoreElements()
  18. .onErrorResumeNext(Observable.<HostUpdate<W, R>>empty())
  19. .concatWith(Observable.just(new HostUpdate<>(Action.Remove, holder)))
  20. .mergeWith(Observable.just(new HostUpdate<>(Action.Add, holder)));
  21. }
  22. }).flatMap(newCollector(collector.<W, R>newCollector()), 1).distinctUntilChanged());
  23. }

代码示例来源:origin: grandcentrix/ThirtyInch

  1. }).distinctUntilChanged();

代码示例来源:origin: jhusain/learnrxjava

  1. .distinctUntilChanged(PlayEvent::getSession)
  2. .onErrorResumeNext(t -> {
  3. System.out.println(" ***** complete group: " + groupedObservable.getKey());

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.just(1, 1, 2, 2, 3, 4, 4, 1, 1, 5)
  4. .distinctUntilChanged()
  5. .subscribe(new Action1<Integer>() {
  6. @Override
  7. public void call(Integer integer) {
  8. log(integer);
  9. }
  10. });
  11. }
  12. });

代码示例来源:origin: com.netflix.eureka/eureka2-client

  1. @Override
  2. public Observable<BufferState> forInterest(final Interest<T> interest) {
  3. return Observable.create(new OnSubscribe<BufferState>() {
  4. @Override
  5. public void call(Subscriber<? super BufferState> subscriber) {
  6. updatesSubject.pause();
  7. try {
  8. BufferState current = shouldBatch(interest);
  9. subscriber.onNext(current);
  10. updatesSubject.map(new Func1<Boolean, BufferState>() {
  11. @Override
  12. public BufferState call(Boolean tick) {
  13. return shouldBatch(interest);
  14. }
  15. }).subscribe(subscriber);
  16. } finally {
  17. updatesSubject.resume();
  18. }
  19. }
  20. }).distinctUntilChanged();
  21. }

代码示例来源:origin: hanks-zyh/RxSerach

  1. private Observable<Boolean> getConnectedObservable(Context context) {
  2. return BroadcastObservable.fromConnectivityManager(context)
  3. .distinctUntilChanged()
  4. .filter(new Func1<Boolean, Boolean>() {
  5. @Override public Boolean call(Boolean isConnected) {
  6. return isConnected;
  7. }
  8. });
  9. }

代码示例来源:origin: com.netflix.eureka/eureka2-client

  1. public Observable<Interest<InstanceInfo>> interestChangeStream() {
  2. return interestSubject.asObservable().distinctUntilChanged();
  3. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Observable<Download> getDownloadsByMd5(String md5) {
  2. return downloadsRepository.getDownloadListByMd5(md5)
  3. .flatMap(downloads -> Observable.from(downloads)
  4. .filter(download -> download != null || isFileMissingFromCompletedDownload(download))
  5. .toList())
  6. .map(downloads -> {
  7. if (downloads.isEmpty()) {
  8. return null;
  9. } else {
  10. return downloads.get(0);
  11. }
  12. })
  13. .distinctUntilChanged();
  14. }

代码示例来源:origin: Laimiux/rxnetwork-android

  1. /**
  2. * Creates an observable that listens to connectivity changes
  3. */
  4. public static Observable<Boolean> stream(Context context) {
  5. final Context applicationContext = context.getApplicationContext();
  6. final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
  7. return ContentObservable.fromBroadcast(context, action)
  8. // To get initial connectivity status
  9. .startWith((Intent) null)
  10. .map(new Func1<Intent, Boolean>() {
  11. @Override public Boolean call(Intent ignored) {
  12. return getConnectivityStatus(applicationContext);
  13. }
  14. }).distinctUntilChanged();
  15. }
  16. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. public Observable<Download> getAsListDownload(String md5) {
  2. return downloadAccessor.getAsList(md5)
  3. .map(downloads -> {
  4. for (int i = 0; i < downloads.size(); i++) {
  5. Download download = downloads.get(i);
  6. if (download == null || (download.getOverallDownloadStatus() == Download.COMPLETED
  7. && getStateIfFileExists(download) == Download.FILE_MISSING)) {
  8. downloads.remove(i);
  9. i--;
  10. }
  11. }
  12. if (downloads.isEmpty()) {
  13. return null;
  14. } else {
  15. return downloads.get(0);
  16. }
  17. })
  18. .distinctUntilChanged();
  19. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
  3. int limit, Order order) {
  4. checkArgument(isValidTimeRange(start, end));
  5. if (distinct) {
  6. return findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
  7. } else {
  8. return findDataPoints(id, start, end, limit, order);
  9. }
  10. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
  3. int limit, Order order) {
  4. checkArgument(isValidTimeRange(start, end));
  5. if (distinct) {
  6. return findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
  7. } else {
  8. return findDataPoints(id, start, end, limit, order);
  9. }
  10. }

代码示例来源:origin: au.gov.amsa.risky/formats

  1. public static <T extends Fix> Transformer<T, T> ignoreOutOfOrderFixes(final boolean enabled) {
  2. return o -> {
  3. return o.scan((a, b) -> {
  4. if (!enabled || b.time() > a.time())
  5. return b;
  6. else
  7. return a;
  8. }).distinctUntilChanged();
  9. };
  10. }

代码示例来源:origin: com.netflix.eureka/eureka2-server

  1. public void init() {
  2. Observable<InstanceInfo> selfInfoStream = resolve().distinctUntilChanged();
  3. subscription = connect(selfInfoStream).retryWhen(new RetryStrategyFunc(500)).subscribe();
  4. }

代码示例来源:origin: hawkular/hawkular-metrics

  1. @Override
  2. public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start,
  3. long end, boolean distinct, int limit, Order order) {
  4. checkArgument(isValidTimeRange(start, end), "Invalid time range");
  5. if (distinct) {
  6. Observable<DataPoint<AvailabilityType>> availabilityData = findDataPoints(id, start, end, 0, order)
  7. .distinctUntilChanged(DataPoint::getValue);
  8. if (limit <= 0) {
  9. return availabilityData;
  10. } else {
  11. return availabilityData.limit(limit);
  12. }
  13. } else {
  14. return findDataPoints(id, start, end, limit, order);
  15. }
  16. }

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

  1. @Override
  2. public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start,
  3. long end, boolean distinct, int limit, Order order) {
  4. checkArgument(isValidTimeRange(start, end), "Invalid time range");
  5. if (distinct) {
  6. Observable<DataPoint<AvailabilityType>> availabilityData = findDataPoints(id, start, end, 0, order)
  7. .distinctUntilChanged(DataPoint::getValue);
  8. if (limit <= 0) {
  9. return availabilityData;
  10. } else {
  11. return availabilityData.limit(limit);
  12. }
  13. } else {
  14. return findDataPoints(id, start, end, limit, order);
  15. }
  16. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_516() throws Exception {
  3. Observable<Weather> measurements = Observable.empty();
  4. Observable<Weather> tempChanges = measurements
  5. .distinctUntilChanged(Weather::getTemperature);
  6. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_89() throws Exception {
  3. Observable
  4. .interval(10, TimeUnit.MILLISECONDS)
  5. .map(x -> getOrderBookLength())
  6. .distinctUntilChanged();
  7. }

代码示例来源:origin: au.gov.amsa.risky/formats

  1. @Override
  2. public Observable<T> call(Observable<T> fixes) {
  3. Observable<T> result = fixes.scan((latest, fix) -> {
  4. if (fix.fix().mmsi() != latest.fix().mmsi())
  5. throw new RuntimeException("can only downsample a single vessel");
  6. else if (fix.fix().time() < latest.fix().time())
  7. throw new RuntimeException("not in ascending time order!");
  8. else if (fix.fix().time() - latest.fix().time() >= minTimeBetweenFixesMs
  9. || selector.call(fix))
  10. return fix;
  11. else
  12. return latest;
  13. });
  14. if (minTimeBetweenFixesMs > 0)
  15. // throw away repeats
  16. result = result.distinctUntilChanged(f -> f.fix().time());
  17. return result;
  18. }

相关文章

Observable类方法