rx.Observable.skip()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中rx.Observable.skip()方法的一些代码示例,展示了Observable.skip()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.skip()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:skip

Observable.skip介绍

[英]Returns an Observable that skips the first num items emitted by the source Observable and emits the remainder.

Scheduler: This version of skip does not operate by default on a particular Scheduler.
[中]返回一个Observable,它跳过源Observable发出的第一个num项,并发出剩余的num项。
调度程序:默认情况下,此版本的skip不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. public Observable<Output> call(Observable<Bucket> window) {
  3. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  4. }
  5. };

代码示例来源:origin: apache/usergrid

  1. private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
  2. //now read all older versions of an edge, and remove them. Finally calling delete
  3. final SearchByEdge searchByEdge =
  4. new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
  5. SearchByEdgeType.Order.DESCENDING, Optional.absent() );
  6. //load our versions, only retain the most recent one
  7. gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
  8. if (logger.isDebugEnabled()) {
  9. logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
  10. }
  11. return gm.markEdge(edgeToDelete );
  12. }).lastOrDefault(null).doOnNext(lastEdge -> {
  13. //no op if we hit our default
  14. if (lastEdge == null) {
  15. return;
  16. }
  17. //don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
  18. gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
  19. }).toBlocking().lastOrDefault(null);//this should throw an exception
  20. }

代码示例来源:origin: apache/usergrid

  1. return gm.loadEdgeVersions( searchByEdge )
  2. .skip( 1 )

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
  2. Func2<Bucket, Event, Bucket> reduceCommandCompletion,
  3. Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  5. this.sourceStream = bucketedStream
  6. .scan(getEmptyOutputValue(), reduceBucket)
  7. .skip(numBuckets)
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. isSourceCurrentlySubscribed.set(true);
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(false);
  18. }
  19. })
  20. .share() //multiple subscribers should get same data
  21. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  22. }

代码示例来源:origin: apache/usergrid

  1. entityEventObservable.takeWhile(writeEvent -> !tracker.shouldStopProcessingEntities()).skip(entityNumSkip)
  2. .flatMap(writeEvent -> {
  3. return Observable.just(writeEvent).doOnNext(doWork);
  4. writeEvent -> !tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper -> {
  5. return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testEmptyStreamProducesEmptyDistributions() {
  3. HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("Collapser-Batch-Size-A");
  4. stream = RollingCollapserBatchSizeDistributionStream.getInstance(key, 10, 100);
  5. stream.startCachingStreamValuesIfUnstarted();
  6. final CountDownLatch latch = new CountDownLatch(1);
  7. stream.observe().skip(10).take(10).subscribe(new Subscriber<CachedValuesHistogram>() {
  8. @Override
  9. public void onCompleted() {
  10. latch.countDown();
  11. }
  12. @Override
  13. public void onError(Throwable e) {
  14. fail(e.getMessage());
  15. }
  16. @Override
  17. public void onNext(CachedValuesHistogram distribution) {
  18. System.out.println("OnNext @ " + System.currentTimeMillis());
  19. assertEquals(0, distribution.getTotalCount());
  20. }
  21. });
  22. //no writes
  23. try {
  24. assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  25. } catch (InterruptedException ex) {
  26. fail("Interrupted ex");
  27. }
  28. assertEquals(0, stream.getLatest().getTotalCount());
  29. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. @Override
  2. public View onCreateView(LayoutInflater inflater,
  3. @Nullable ViewGroup container,
  4. @Nullable Bundle savedInstanceState) {
  5. View layout = inflater.inflate(R.layout.fragment_form_validation_comb_latest,
  6. container,
  7. false);
  8. ButterKnife.bind(this, layout);
  9. /*_email.addTextChangedListener(new TextWatcher() {
  10. @Override
  11. public void beforeTextChanged(CharSequence s, int start, int count, int after) {
  12. Log.d("FormValidation","beforeTextChanged");
  13. }
  14. @Override
  15. public void onTextChanged(CharSequence s, int start, int before, int count) {
  16. Log.d("FormValidation","onTextChanged");
  17. }
  18. @Override
  19. public void afterTextChanged(Editable s) {
  20. Log.d("FormValidation","afterTextChanged");
  21. }
  22. });*/
  23. _emailChangeObservable = RxTextView.textChanges(_email).skip(1);
  24. _passwordChangeObservable = RxTextView.textChanges(_password).skip(1);
  25. _numberChangeObservable = RxTextView.textChanges(_number).skip(1);
  26. _combineLatestEvents();
  27. return layout;
  28. }

代码示例来源:origin: henrymorgen/android-advanced-light

  1. private void skip() {
  2. Observable.just(1, 2, 3, 4, 5, 6).skip(2).subscribe(new Action1<Integer>() {
  3. @Override
  4. public void call(Integer integer) {
  5. Log.d(TAG, "skip:" + integer);
  6. }
  7. });
  8. }

代码示例来源:origin: leeowenowen/rxjava-examples

  1. @Override
  2. public void run() {
  3. Observable.range(1, 10).skip(2).subscribe(new Action1<Integer>() {
  4. @Override
  5. public void call(Integer integer) {
  6. log(integer);
  7. }
  8. });
  9. }
  10. });

代码示例来源:origin: phajduk/RxValidator

  1. public RxValidator onFocusChanged() {
  2. this.changeEmitter = RxView.focusChanges(et).skip(1).filter(new Func1<Boolean, Boolean>() {
  3. @Override public Boolean call(Boolean hasFocus) {
  4. return !hasFocus;
  5. }
  6. }).map(new Func1<Boolean, String>() {
  7. @Override public String call(Boolean aBoolean) {
  8. return et.getText().toString();
  9. }
  10. });
  11. return this;
  12. }

代码示例来源:origin: phajduk/RxValidator

  1. public RxValidator onValueChanged() {
  2. this.changeEmitter = RxTextView.textChanges(et).skip(1).map(new Func1<CharSequence, String>() {
  3. @Override public String call(CharSequence charSequence) {
  4. return charSequence.toString();
  5. }
  6. });
  7. return this;
  8. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. @Override
  2. public Observable<Output> call(Observable<Bucket> window) {
  3. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  4. }
  5. };

代码示例来源:origin: com.netflix.rxjava/rxjava-swing

  1. /**
  2. * @see rx.observables.SwingObservable#fromRelativeMouseMotion
  3. */
  4. public static Observable<Point> fromRelativeMouseMotion(final Component component) {
  5. final Observable<MouseEvent> events = fromMouseMotionEventsOf(component);
  6. return Observable.zip(events, events.skip(1), new Func2<MouseEvent, MouseEvent, Point>() {
  7. @Override
  8. public Point call(MouseEvent ev1, MouseEvent ev2) {
  9. return new Point(ev2.getX() - ev1.getX(), ev2.getY() - ev1.getY());
  10. }
  11. });
  12. }

代码示例来源:origin: hotchemi/tiamat

  1. void bindPreference(CheckBox checkBox, Preference<Boolean> preference) {
  2. subscriptions.add(preference.asObservable()
  3. .observeOn(AndroidSchedulers.mainThread())
  4. .subscribe(RxCompoundButton.checked(checkBox)));
  5. subscriptions.add(RxCompoundButton.checkedChanges(checkBox)
  6. .skip(1)
  7. .subscribe(preference.asAction()));
  8. }
  9. }

代码示例来源:origin: davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Pair<T, Statistics>> call(Observable<T> source) {
  3. return source.scan(Pair.create((T) null, Statistics.create()),
  4. new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
  5. @Override
  6. public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
  7. return Pair.create(t, pair.b().add(function.call(t)));
  8. }
  9. }).skip(1);
  10. }
  11. };

代码示例来源:origin: com.github.davidmoten/rxjava-extras

  1. @Override
  2. public Observable<Pair<T, Statistics>> call(Observable<T> source) {
  3. return source.scan(Pair.create((T) null, Statistics.create()),
  4. new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
  5. @Override
  6. public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
  7. return Pair.create(t, pair.b().add(function.call(t)));
  8. }
  9. }).skip(1);
  10. }
  11. };

代码示例来源:origin: com.trunk.rx.json/rxjava-json-core

  1. protected JsonArray(Observable<T> elements) {
  2. super(
  3. Observable.<JsonToken>just(JsonArrayStart.instance())
  4. .concatWith(
  5. elements
  6. .concatMap(jsonElement -> Observable.<JsonToken>just(JsonComma.instance()).concatWith(jsonElement))
  7. .skip(1)
  8. )
  9. .concatWith(Observable.just(JsonArrayEnd.instance()))
  10. );
  11. this.elements = elements;
  12. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_205() throws Exception {
  3. // Observable<String>
  4. // that emits 75 strings
  5. getDataFromNetworkAsynchronously()
  6. .skip(10)
  7. .take(5)
  8. .map(s -> s + "_transformed")
  9. .subscribe(System.out::println);
  10. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_524() throws Exception {
  3. Observable.range(1, 5).take(3); // [1, 2, 3]
  4. Observable.range(1, 5).skip(3); // [4, 5]
  5. Observable.range(1, 5).skip(5); // []
  6. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_271() throws Exception {
  3. Observable<Instant> timestamps = Observable
  4. .fromCallable(() -> dbQuery())
  5. .doOnSubscribe(() -> log.info("subscribe()"))
  6. .doOnRequest(c -> log.info("Requested {}", c))
  7. .doOnNext(instant -> log.info("Got: {}", instant));
  8. timestamps
  9. .zipWith(timestamps.skip(1), Duration::between)
  10. .map(Object::toString)
  11. .subscribe(log::info);
  12. }

相关文章

Observable类方法