rx.Observable.distinctUntilChanged()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(120)

本文整理了Java中rx.Observable.distinctUntilChanged()方法的一些代码示例,展示了Observable.distinctUntilChanged()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.distinctUntilChanged()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:distinctUntilChanged

Observable.distinctUntilChanged介绍

[英]Returns an Observable that emits all items emitted by the source Observable that are distinct from their immediate predecessors.

Scheduler: distinctUntilChanged does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的与前一个Observable不同的所有项。
调度程序:默认情况下,distinctUntilChanged不会在特定调度程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

private InnerIterator( int maxSize ) {
  queue = new ArrayBlockingQueue<>( maxSize );
}

代码示例来源:origin: ReactiveX/RxNetty

@Override
public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
  return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() {
    @Override
    public HostHolder<W, R> call(HostConnector<W, R> connector) {
      HostHolder<W, R> newHolder = strategy.toHolder(connector);
      connector.subscribe(newHolder.getEventListener());
      return newHolder;
    }
  }).flatMap(new Func1<HostHolder<W, R>, Observable<HostUpdate<W, R>>>() {
    @Override
    public Observable<HostUpdate<W, R>> call(HostHolder<W, R> holder) {
      return holder.getConnector()
             .getHost()
             .getCloseNotifier()
             .map(new VoidToAnythingCast<HostUpdate<W, R>>())
             .ignoreElements()
             .onErrorResumeNext(Observable.<HostUpdate<W, R>>empty())
             .concatWith(Observable.just(new HostUpdate<>(Action.Remove, holder)))
             .mergeWith(Observable.just(new HostUpdate<>(Action.Add, holder)));
    }
  }).flatMap(newCollector(collector.<W, R>newCollector()), 1).distinctUntilChanged());
}

代码示例来源:origin: grandcentrix/ThirtyInch

}).distinctUntilChanged();

代码示例来源:origin: jhusain/learnrxjava

.distinctUntilChanged(PlayEvent::getSession)
.onErrorResumeNext(t -> {
  System.out.println("     ***** complete group: " + groupedObservable.getKey());

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.just(1, 1, 2, 2, 3, 4, 4, 1, 1, 5)
       .distinctUntilChanged()
       .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
         log(integer);
        }
       });
 }
});

代码示例来源:origin: com.netflix.eureka/eureka2-client

@Override
public Observable<BufferState> forInterest(final Interest<T> interest) {
  return Observable.create(new OnSubscribe<BufferState>() {
    @Override
    public void call(Subscriber<? super BufferState> subscriber) {
      updatesSubject.pause();
      try {
        BufferState current = shouldBatch(interest);
        subscriber.onNext(current);
        updatesSubject.map(new Func1<Boolean, BufferState>() {
          @Override
          public BufferState call(Boolean tick) {
            return shouldBatch(interest);
          }
        }).subscribe(subscriber);
      } finally {
        updatesSubject.resume();
      }
    }
  }).distinctUntilChanged();
}

代码示例来源:origin: hanks-zyh/RxSerach

private Observable<Boolean> getConnectedObservable(Context context) {
  return BroadcastObservable.fromConnectivityManager(context)
      .distinctUntilChanged()
      .filter(new Func1<Boolean, Boolean>() {
        @Override public Boolean call(Boolean isConnected) {
          return isConnected;
        }
      });
}

代码示例来源:origin: com.netflix.eureka/eureka2-client

public Observable<Interest<InstanceInfo>> interestChangeStream() {
  return interestSubject.asObservable().distinctUntilChanged();
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Observable<Download> getDownloadsByMd5(String md5) {
 return downloadsRepository.getDownloadListByMd5(md5)
   .flatMap(downloads -> Observable.from(downloads)
     .filter(download -> download != null || isFileMissingFromCompletedDownload(download))
     .toList())
   .map(downloads -> {
    if (downloads.isEmpty()) {
     return null;
    } else {
     return downloads.get(0);
    }
   })
   .distinctUntilChanged();
}

代码示例来源:origin: Laimiux/rxnetwork-android

/**
  * Creates an observable that listens to connectivity changes
  */
 public static Observable<Boolean> stream(Context context) {
  final Context applicationContext = context.getApplicationContext();
  final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
  return ContentObservable.fromBroadcast(context, action)
    // To get initial connectivity status
    .startWith((Intent) null)
    .map(new Func1<Intent, Boolean>() {
     @Override public Boolean call(Intent ignored) {
      return getConnectivityStatus(applicationContext);
     }
    }).distinctUntilChanged();
 }
}

代码示例来源:origin: Aptoide/aptoide-client-v8

public Observable<Download> getAsListDownload(String md5) {
 return downloadAccessor.getAsList(md5)
   .map(downloads -> {
    for (int i = 0; i < downloads.size(); i++) {
     Download download = downloads.get(i);
     if (download == null || (download.getOverallDownloadStatus() == Download.COMPLETED
       && getStateIfFileExists(download) == Download.FILE_MISSING)) {
      downloads.remove(i);
      i--;
     }
    }
    if (downloads.isEmpty()) {
     return null;
    } else {
     return downloads.get(0);
    }
   })
   .distinctUntilChanged();
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
    int limit, Order order) {
  checkArgument(isValidTimeRange(start, end));
  if (distinct) {
    return findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
  } else {
    return findDataPoints(id, start, end, limit, order);
  }
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
    int limit, Order order) {
  checkArgument(isValidTimeRange(start, end));
  if (distinct) {
    return findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
  } else {
    return findDataPoints(id, start, end, limit, order);
  }
}

代码示例来源:origin: au.gov.amsa.risky/formats

public static <T extends Fix> Transformer<T, T> ignoreOutOfOrderFixes(final boolean enabled) {
  return o -> {
    return o.scan((a, b) -> {
      if (!enabled || b.time() > a.time())
        return b;
      else
        return a;
    }).distinctUntilChanged();
  };
}

代码示例来源:origin: com.netflix.eureka/eureka2-server

public void init() {
  Observable<InstanceInfo> selfInfoStream = resolve().distinctUntilChanged();
  subscription = connect(selfInfoStream).retryWhen(new RetryStrategyFunc(500)).subscribe();
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start,
    long end, boolean distinct, int limit, Order order) {
  checkArgument(isValidTimeRange(start, end), "Invalid time range");
  if (distinct) {
    Observable<DataPoint<AvailabilityType>> availabilityData = findDataPoints(id, start, end, 0, order)
        .distinctUntilChanged(DataPoint::getValue);
    if (limit <= 0) {
      return availabilityData;
    } else {
      return availabilityData.limit(limit);
    }
  } else {
    return findDataPoints(id, start, end, limit, order);
  }
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start,
    long end, boolean distinct, int limit, Order order) {
  checkArgument(isValidTimeRange(start, end), "Invalid time range");
  if (distinct) {
    Observable<DataPoint<AvailabilityType>> availabilityData = findDataPoints(id, start, end, 0, order)
        .distinctUntilChanged(DataPoint::getValue);
    if (limit <= 0) {
      return availabilityData;
    } else {
      return availabilityData.limit(limit);
    }
  } else {
    return findDataPoints(id, start, end, limit, order);
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_516() throws Exception {
  Observable<Weather> measurements = Observable.empty();
  Observable<Weather> tempChanges = measurements
      .distinctUntilChanged(Weather::getTemperature);
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_89() throws Exception {
  Observable
      .interval(10, TimeUnit.MILLISECONDS)
      .map(x -> getOrderBookLength())
      .distinctUntilChanged();
}

代码示例来源:origin: au.gov.amsa.risky/formats

@Override
public Observable<T> call(Observable<T> fixes) {
  Observable<T> result = fixes.scan((latest, fix) -> {
    if (fix.fix().mmsi() != latest.fix().mmsi())
      throw new RuntimeException("can only downsample a single vessel");
    else if (fix.fix().time() < latest.fix().time())
      throw new RuntimeException("not in ascending time order!");
    else if (fix.fix().time() - latest.fix().time() >= minTimeBetweenFixesMs
        || selector.call(fix))
      return fix;
    else
      return latest;
  });
  if (minTimeBetweenFixesMs > 0)
    // throw away repeats
    result = result.distinctUntilChanged(f -> f.fix().time());
  return result;
}

相关文章

Observable类方法