本文整理了Java中rx.Observable.skip()
方法的一些代码示例,展示了Observable.skip()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.skip()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:skip
[英]Returns an Observable that skips the first num items emitted by the source Observable and emits the remainder.
Scheduler: This version of skip does not operate by default on a particular Scheduler.
[中]返回一个Observable,它跳过源Observable发出的第一个num项,并发出剩余的num项。
调度程序:默认情况下,此版本的skip不会在特定调度程序上运行。
代码示例来源:origin: PipelineAI/pipeline
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
代码示例来源:origin: apache/usergrid
private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
//now read all older versions of an edge, and remove them. Finally calling delete
final SearchByEdge searchByEdge =
new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.absent() );
//load our versions, only retain the most recent one
gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
if (logger.isDebugEnabled()) {
logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
}
return gm.markEdge(edgeToDelete );
}).lastOrDefault(null).doOnNext(lastEdge -> {
//no op if we hit our default
if (lastEdge == null) {
return;
}
//don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
}).toBlocking().lastOrDefault(null);//this should throw an exception
}
代码示例来源:origin: apache/usergrid
return gm.loadEdgeVersions( searchByEdge )
.skip( 1 )
代码示例来源:origin: PipelineAI/pipeline
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}
代码示例来源:origin: apache/usergrid
entityEventObservable.takeWhile(writeEvent -> !tracker.shouldStopProcessingEntities()).skip(entityNumSkip)
.flatMap(writeEvent -> {
return Observable.just(writeEvent).doOnNext(doWork);
writeEvent -> !tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper -> {
return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testEmptyStreamProducesEmptyDistributions() {
HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("Collapser-Batch-Size-A");
stream = RollingCollapserBatchSizeDistributionStream.getInstance(key, 10, 100);
stream.startCachingStreamValuesIfUnstarted();
final CountDownLatch latch = new CountDownLatch(1);
stream.observe().skip(10).take(10).subscribe(new Subscriber<CachedValuesHistogram>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
fail(e.getMessage());
}
@Override
public void onNext(CachedValuesHistogram distribution) {
System.out.println("OnNext @ " + System.currentTimeMillis());
assertEquals(0, distribution.getTotalCount());
}
});
//no writes
try {
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
} catch (InterruptedException ex) {
fail("Interrupted ex");
}
assertEquals(0, stream.getLatest().getTotalCount());
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
@Override
public View onCreateView(LayoutInflater inflater,
@Nullable ViewGroup container,
@Nullable Bundle savedInstanceState) {
View layout = inflater.inflate(R.layout.fragment_form_validation_comb_latest,
container,
false);
ButterKnife.bind(this, layout);
/*_email.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
Log.d("FormValidation","beforeTextChanged");
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
Log.d("FormValidation","onTextChanged");
}
@Override
public void afterTextChanged(Editable s) {
Log.d("FormValidation","afterTextChanged");
}
});*/
_emailChangeObservable = RxTextView.textChanges(_email).skip(1);
_passwordChangeObservable = RxTextView.textChanges(_password).skip(1);
_numberChangeObservable = RxTextView.textChanges(_number).skip(1);
_combineLatestEvents();
return layout;
}
代码示例来源:origin: henrymorgen/android-advanced-light
private void skip() {
Observable.just(1, 2, 3, 4, 5, 6).skip(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "skip:" + integer);
}
});
}
代码示例来源:origin: leeowenowen/rxjava-examples
@Override
public void run() {
Observable.range(1, 10).skip(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
log(integer);
}
});
}
});
代码示例来源:origin: phajduk/RxValidator
public RxValidator onFocusChanged() {
this.changeEmitter = RxView.focusChanges(et).skip(1).filter(new Func1<Boolean, Boolean>() {
@Override public Boolean call(Boolean hasFocus) {
return !hasFocus;
}
}).map(new Func1<Boolean, String>() {
@Override public String call(Boolean aBoolean) {
return et.getText().toString();
}
});
return this;
}
代码示例来源:origin: phajduk/RxValidator
public RxValidator onValueChanged() {
this.changeEmitter = RxTextView.textChanges(et).skip(1).map(new Func1<CharSequence, String>() {
@Override public String call(CharSequence charSequence) {
return charSequence.toString();
}
});
return this;
}
代码示例来源:origin: com.netflix.hystrix/hystrix-core
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
代码示例来源:origin: com.netflix.rxjava/rxjava-swing
/**
* @see rx.observables.SwingObservable#fromRelativeMouseMotion
*/
public static Observable<Point> fromRelativeMouseMotion(final Component component) {
final Observable<MouseEvent> events = fromMouseMotionEventsOf(component);
return Observable.zip(events, events.skip(1), new Func2<MouseEvent, MouseEvent, Point>() {
@Override
public Point call(MouseEvent ev1, MouseEvent ev2) {
return new Point(ev2.getX() - ev1.getX(), ev2.getY() - ev1.getY());
}
});
}
代码示例来源:origin: hotchemi/tiamat
void bindPreference(CheckBox checkBox, Preference<Boolean> preference) {
subscriptions.add(preference.asObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(RxCompoundButton.checked(checkBox)));
subscriptions.add(RxCompoundButton.checkedChanges(checkBox)
.skip(1)
.subscribe(preference.asAction()));
}
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public Observable<Pair<T, Statistics>> call(Observable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
return Pair.create(t, pair.b().add(function.call(t)));
}
}).skip(1);
}
};
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Observable<Pair<T, Statistics>> call(Observable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
return Pair.create(t, pair.b().add(function.call(t)));
}
}).skip(1);
}
};
代码示例来源:origin: com.trunk.rx.json/rxjava-json-core
protected JsonArray(Observable<T> elements) {
super(
Observable.<JsonToken>just(JsonArrayStart.instance())
.concatWith(
elements
.concatMap(jsonElement -> Observable.<JsonToken>just(JsonComma.instance()).concatWith(jsonElement))
.skip(1)
)
.concatWith(Observable.just(JsonArrayEnd.instance()))
);
this.elements = elements;
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_205() throws Exception {
// Observable<String>
// that emits 75 strings
getDataFromNetworkAsynchronously()
.skip(10)
.take(5)
.map(s -> s + "_transformed")
.subscribe(System.out::println);
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_524() throws Exception {
Observable.range(1, 5).take(3); // [1, 2, 3]
Observable.range(1, 5).skip(3); // [4, 5]
Observable.range(1, 5).skip(5); // []
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_271() throws Exception {
Observable<Instant> timestamps = Observable
.fromCallable(() -> dbQuery())
.doOnSubscribe(() -> log.info("subscribe()"))
.doOnRequest(c -> log.info("Requested {}", c))
.doOnNext(instant -> log.info("Got: {}", instant));
timestamps
.zipWith(timestamps.skip(1), Duration::between)
.map(Object::toString)
.subscribe(log::info);
}
内容来源于网络,如有侵权,请联系作者删除!