rx.Observable.skip()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中rx.Observable.skip()方法的一些代码示例,展示了Observable.skip()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.skip()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:skip

Observable.skip介绍

[英]Returns an Observable that skips the first num items emitted by the source Observable and emits the remainder.

Scheduler: This version of skip does not operate by default on a particular Scheduler.
[中]返回一个Observable,它跳过源Observable发出的第一个num项,并发出剩余的num项。
调度程序:默认情况下,此版本的skip不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

@Override
  public Observable<Output> call(Observable<Bucket> window) {
    return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  }
};

代码示例来源:origin: apache/usergrid

private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
  //now read all older versions of an edge, and remove them.  Finally calling delete
  final SearchByEdge searchByEdge =
    new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE,
      SearchByEdgeType.Order.DESCENDING, Optional.absent() );
  //load our versions, only retain the most recent one
  gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
    if (logger.isDebugEnabled()) {
      logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
    }
    return gm.markEdge(edgeToDelete );
  }).lastOrDefault(null).doOnNext(lastEdge -> {
    //no op if we hit our default
    if (lastEdge == null) {
      return;
    }
    //don't queue delete b/c that de-indexes, we need to delete the edges only since we have a version still existing to index.
    gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw an exception
  }).toBlocking().lastOrDefault(null);//this should throw an exception
}

代码示例来源:origin: apache/usergrid

return gm.loadEdgeVersions( searchByEdge )
  .skip( 1 )

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: apache/usergrid

entityEventObservable.takeWhile(writeEvent -> !tracker.shouldStopProcessingEntities()).skip(entityNumSkip)
  .flatMap(writeEvent -> {
    return Observable.just(writeEvent).doOnNext(doWork);
  writeEvent -> !tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper -> {
  return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testEmptyStreamProducesEmptyDistributions() {
  HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("Collapser-Batch-Size-A");
  stream = RollingCollapserBatchSizeDistributionStream.getInstance(key, 10, 100);
  stream.startCachingStreamValuesIfUnstarted();
  final CountDownLatch latch = new CountDownLatch(1);
  stream.observe().skip(10).take(10).subscribe(new Subscriber<CachedValuesHistogram>() {
    @Override
    public void onCompleted() {
      latch.countDown();
    }
    @Override
    public void onError(Throwable e) {
      fail(e.getMessage());
    }
    @Override
    public void onNext(CachedValuesHistogram distribution) {
      System.out.println("OnNext @ " + System.currentTimeMillis());
      assertEquals(0, distribution.getTotalCount());
    }
  });
  //no writes
  try {
    assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  } catch (InterruptedException ex) {
    fail("Interrupted ex");
  }
  assertEquals(0, stream.getLatest().getTotalCount());
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@Override
public View onCreateView(LayoutInflater inflater,
             @Nullable ViewGroup container,
             @Nullable Bundle savedInstanceState) {
  View layout = inflater.inflate(R.layout.fragment_form_validation_comb_latest,
     container,
     false);
  ButterKnife.bind(this, layout);
  /*_email.addTextChangedListener(new TextWatcher() {
    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) {
      Log.d("FormValidation","beforeTextChanged");
    }
    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) {
      Log.d("FormValidation","onTextChanged");
    }
    @Override
    public void afterTextChanged(Editable s) {
      Log.d("FormValidation","afterTextChanged");
    }
  });*/
  _emailChangeObservable = RxTextView.textChanges(_email).skip(1);
  _passwordChangeObservable = RxTextView.textChanges(_password).skip(1);
  _numberChangeObservable = RxTextView.textChanges(_number).skip(1);
  _combineLatestEvents();
  return layout;
}

代码示例来源:origin: henrymorgen/android-advanced-light

private void skip() {
  Observable.just(1, 2, 3, 4, 5, 6).skip(2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
      Log.d(TAG, "skip:" + integer);
    }
  });
}

代码示例来源:origin: leeowenowen/rxjava-examples

@Override
 public void run() {
  Observable.range(1, 10).skip(2).subscribe(new Action1<Integer>() {
   @Override
   public void call(Integer integer) {
    log(integer);
   }
  });
 }
});

代码示例来源:origin: phajduk/RxValidator

public RxValidator onFocusChanged() {
 this.changeEmitter = RxView.focusChanges(et).skip(1).filter(new Func1<Boolean, Boolean>() {
  @Override public Boolean call(Boolean hasFocus) {
   return !hasFocus;
  }
 }).map(new Func1<Boolean, String>() {
  @Override public String call(Boolean aBoolean) {
   return et.getText().toString();
  }
 });
 return this;
}

代码示例来源:origin: phajduk/RxValidator

public RxValidator onValueChanged() {
 this.changeEmitter = RxTextView.textChanges(et).skip(1).map(new Func1<CharSequence, String>() {
  @Override public String call(CharSequence charSequence) {
   return charSequence.toString();
  }
 });
 return this;
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

@Override
  public Observable<Output> call(Observable<Bucket> window) {
    return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  }
};

代码示例来源:origin: com.netflix.rxjava/rxjava-swing

/**
 * @see rx.observables.SwingObservable#fromRelativeMouseMotion
 */
public static Observable<Point> fromRelativeMouseMotion(final Component component) {
  final Observable<MouseEvent> events = fromMouseMotionEventsOf(component);
  return Observable.zip(events, events.skip(1), new Func2<MouseEvent, MouseEvent, Point>() {
    @Override
    public Point call(MouseEvent ev1, MouseEvent ev2) {
      return new Point(ev2.getX() - ev1.getX(), ev2.getY() - ev1.getY());
    }
  });
}

代码示例来源:origin: hotchemi/tiamat

void bindPreference(CheckBox checkBox, Preference<Boolean> preference) {
    subscriptions.add(preference.asObservable()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(RxCompoundButton.checked(checkBox)));
    subscriptions.add(RxCompoundButton.checkedChanges(checkBox)
        .skip(1)
        .subscribe(preference.asAction()));
  }
}

代码示例来源:origin: davidmoten/rxjava-extras

@Override
  public Observable<Pair<T, Statistics>> call(Observable<T> source) {
    return source.scan(Pair.create((T) null, Statistics.create()),
        new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
          @Override
          public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
            return Pair.create(t, pair.b().add(function.call(t)));
          }
        }).skip(1);
  }
};

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
  public Observable<Pair<T, Statistics>> call(Observable<T> source) {
    return source.scan(Pair.create((T) null, Statistics.create()),
        new Func2<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
          @Override
          public Pair<T, Statistics> call(Pair<T, Statistics> pair, T t) {
            return Pair.create(t, pair.b().add(function.call(t)));
          }
        }).skip(1);
  }
};

代码示例来源:origin: com.trunk.rx.json/rxjava-json-core

protected JsonArray(Observable<T> elements) {
 super(
  Observable.<JsonToken>just(JsonArrayStart.instance())
   .concatWith(
    elements
     .concatMap(jsonElement -> Observable.<JsonToken>just(JsonComma.instance()).concatWith(jsonElement))
     .skip(1)
   )
   .concatWith(Observable.just(JsonArrayEnd.instance()))
 );
 this.elements = elements;
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
  public void sample_205() throws Exception {
    // Observable<String>
// that emits 75 strings
    getDataFromNetworkAsynchronously()
        .skip(10)
        .take(5)
        .map(s -> s + "_transformed")
        .subscribe(System.out::println);
  }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_524() throws Exception {
  Observable.range(1, 5).take(3);  // [1, 2, 3]
  Observable.range(1, 5).skip(3);  // [4, 5]
  Observable.range(1, 5).skip(5);  // []
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_271() throws Exception {
  Observable<Instant> timestamps = Observable
      .fromCallable(() -> dbQuery())
      .doOnSubscribe(() -> log.info("subscribe()"))
      .doOnRequest(c -> log.info("Requested {}", c))
      .doOnNext(instant -> log.info("Got: {}", instant));
  timestamps
      .zipWith(timestamps.skip(1), Duration::between)
      .map(Object::toString)
      .subscribe(log::info);
}

相关文章

Observable类方法