本文整理了Java中rx.Observable.distinctUntilChanged()
方法的一些代码示例,展示了Observable.distinctUntilChanged()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.distinctUntilChanged()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:distinctUntilChanged
[英]Returns an Observable that emits all items emitted by the source Observable that are distinct from their immediate predecessors.
Scheduler: distinctUntilChanged does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的与前一个Observable不同的所有项。
调度程序:默认情况下,distinctUntilChanged不会在特定调度程序上运行。
代码示例来源:origin: apache/usergrid
private InnerIterator( int maxSize ) {
queue = new ArrayBlockingQueue<>( maxSize );
}
代码示例来源:origin: ReactiveX/RxNetty
@Override
public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() {
@Override
public HostHolder<W, R> call(HostConnector<W, R> connector) {
HostHolder<W, R> newHolder = strategy.toHolder(connector);
connector.subscribe(newHolder.getEventListener());
return newHolder;
}
}).flatMap(new Func1<HostHolder<W, R>, Observable<HostUpdate<W, R>>>() {
@Override
public Observable<HostUpdate<W, R>> call(HostHolder<W, R> holder) {
return holder.getConnector()
.getHost()
.getCloseNotifier()
.map(new VoidToAnythingCast<HostUpdate<W, R>>())
.ignoreElements()
.onErrorResumeNext(Observable.<HostUpdate<W, R>>empty())
.concatWith(Observable.just(new HostUpdate<>(Action.Remove, holder)))
.mergeWith(Observable.just(new HostUpdate<>(Action.Add, holder)));
}
}).flatMap(newCollector(collector.<W, R>newCollector()), 1).distinctUntilChanged());
}
代码示例来源:origin: grandcentrix/ThirtyInch
}).distinctUntilChanged();
代码示例来源:origin: jhusain/learnrxjava
.distinctUntilChanged(PlayEvent::getSession)
.onErrorResumeNext(t -> {
System.out.println(" ***** complete group: " + groupedObservable.getKey());
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.just(1, 1, 2, 2, 3, 4, 4, 1, 1, 5)
.distinctUntilChanged()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: com.netflix.eureka/eureka2-client
@Override
public Observable<BufferState> forInterest(final Interest<T> interest) {
return Observable.create(new OnSubscribe<BufferState>() {
@Override
public void call(Subscriber<? super BufferState> subscriber) {
updatesSubject.pause();
try {
BufferState current = shouldBatch(interest);
subscriber.onNext(current);
updatesSubject.map(new Func1<Boolean, BufferState>() {
@Override
public BufferState call(Boolean tick) {
return shouldBatch(interest);
}
}).subscribe(subscriber);
} finally {
updatesSubject.resume();
}
}
}).distinctUntilChanged();
}
代码示例来源:origin: hanks-zyh/RxSerach
private Observable<Boolean> getConnectedObservable(Context context) {
return BroadcastObservable.fromConnectivityManager(context)
.distinctUntilChanged()
.filter(new Func1<Boolean, Boolean>() {
@Override public Boolean call(Boolean isConnected) {
return isConnected;
}
});
}
代码示例来源:origin: com.netflix.eureka/eureka2-client
public Observable<Interest<InstanceInfo>> interestChangeStream() {
return interestSubject.asObservable().distinctUntilChanged();
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Observable<Download> getDownloadsByMd5(String md5) {
return downloadsRepository.getDownloadListByMd5(md5)
.flatMap(downloads -> Observable.from(downloads)
.filter(download -> download != null || isFileMissingFromCompletedDownload(download))
.toList())
.map(downloads -> {
if (downloads.isEmpty()) {
return null;
} else {
return downloads.get(0);
}
})
.distinctUntilChanged();
}
代码示例来源:origin: Laimiux/rxnetwork-android
/**
* Creates an observable that listens to connectivity changes
*/
public static Observable<Boolean> stream(Context context) {
final Context applicationContext = context.getApplicationContext();
final IntentFilter action = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
return ContentObservable.fromBroadcast(context, action)
// To get initial connectivity status
.startWith((Intent) null)
.map(new Func1<Intent, Boolean>() {
@Override public Boolean call(Intent ignored) {
return getConnectivityStatus(applicationContext);
}
}).distinctUntilChanged();
}
}
代码示例来源:origin: Aptoide/aptoide-client-v8
public Observable<Download> getAsListDownload(String md5) {
return downloadAccessor.getAsList(md5)
.map(downloads -> {
for (int i = 0; i < downloads.size(); i++) {
Download download = downloads.get(i);
if (download == null || (download.getOverallDownloadStatus() == Download.COMPLETED
&& getStateIfFileExists(download) == Download.FILE_MISSING)) {
downloads.remove(i);
i--;
}
}
if (downloads.isEmpty()) {
return null;
} else {
return downloads.get(0);
}
})
.distinctUntilChanged();
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
int limit, Order order) {
checkArgument(isValidTimeRange(start, end));
if (distinct) {
return findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
} else {
return findDataPoints(id, start, end, limit, order);
}
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<DataPoint<String>> findStringData(MetricId<String> id, long start, long end, boolean distinct,
int limit, Order order) {
checkArgument(isValidTimeRange(start, end));
if (distinct) {
return findDataPoints(id, start, end, limit, order).distinctUntilChanged(DataPoint::getValue);
} else {
return findDataPoints(id, start, end, limit, order);
}
}
代码示例来源:origin: au.gov.amsa.risky/formats
public static <T extends Fix> Transformer<T, T> ignoreOutOfOrderFixes(final boolean enabled) {
return o -> {
return o.scan((a, b) -> {
if (!enabled || b.time() > a.time())
return b;
else
return a;
}).distinctUntilChanged();
};
}
代码示例来源:origin: com.netflix.eureka/eureka2-server
public void init() {
Observable<InstanceInfo> selfInfoStream = resolve().distinctUntilChanged();
subscription = connect(selfInfoStream).retryWhen(new RetryStrategyFunc(500)).subscribe();
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start,
long end, boolean distinct, int limit, Order order) {
checkArgument(isValidTimeRange(start, end), "Invalid time range");
if (distinct) {
Observable<DataPoint<AvailabilityType>> availabilityData = findDataPoints(id, start, end, 0, order)
.distinctUntilChanged(DataPoint::getValue);
if (limit <= 0) {
return availabilityData;
} else {
return availabilityData.limit(limit);
}
} else {
return findDataPoints(id, start, end, limit, order);
}
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public Observable<DataPoint<AvailabilityType>> findAvailabilityData(MetricId<AvailabilityType> id, long start,
long end, boolean distinct, int limit, Order order) {
checkArgument(isValidTimeRange(start, end), "Invalid time range");
if (distinct) {
Observable<DataPoint<AvailabilityType>> availabilityData = findDataPoints(id, start, end, 0, order)
.distinctUntilChanged(DataPoint::getValue);
if (limit <= 0) {
return availabilityData;
} else {
return availabilityData.limit(limit);
}
} else {
return findDataPoints(id, start, end, limit, order);
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_516() throws Exception {
Observable<Weather> measurements = Observable.empty();
Observable<Weather> tempChanges = measurements
.distinctUntilChanged(Weather::getTemperature);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_89() throws Exception {
Observable
.interval(10, TimeUnit.MILLISECONDS)
.map(x -> getOrderBookLength())
.distinctUntilChanged();
}
代码示例来源:origin: au.gov.amsa.risky/formats
@Override
public Observable<T> call(Observable<T> fixes) {
Observable<T> result = fixes.scan((latest, fix) -> {
if (fix.fix().mmsi() != latest.fix().mmsi())
throw new RuntimeException("can only downsample a single vessel");
else if (fix.fix().time() < latest.fix().time())
throw new RuntimeException("not in ascending time order!");
else if (fix.fix().time() - latest.fix().time() >= minTimeBetweenFixesMs
|| selector.call(fix))
return fix;
else
return latest;
});
if (minTimeBetweenFixesMs > 0)
// throw away repeats
result = result.distinctUntilChanged(f -> f.fix().time());
return result;
}
内容来源于网络,如有侵权,请联系作者删除!