io.reactivex.Observable.repeat()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(156)

本文整理了Java中io.reactivex.Observable.repeat()方法的一些代码示例,展示了Observable.repeat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.repeat()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:repeat

Observable.repeat介绍

[英]Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.

Scheduler: repeat does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. /**
  2. * Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
  3. * <p>
  4. * <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatInf.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @return an Observable that emits the items emitted by the source ObservableSource repeatedly and in sequence
  11. * @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
  12. */
  13. @CheckReturnValue
  14. @SchedulerSupport(SchedulerSupport.NONE)
  15. public final Observable<T> repeat() {
  16. return repeat(Long.MAX_VALUE);
  17. }

代码示例来源:origin: redisson/redisson

  1. /**
  2. * Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
  3. * <p>
  4. * <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatInf.o.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
  8. * </dl>
  9. *
  10. * @return an Observable that emits the items emitted by the source ObservableSource repeatedly and in sequence
  11. * @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
  12. */
  13. @CheckReturnValue
  14. @SchedulerSupport(SchedulerSupport.NONE)
  15. public final Observable<T> repeat() {
  16. return repeat(Long.MAX_VALUE);
  17. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatZero() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. Observable.just(1).repeat(0).subscribe(o);
  5. verify(o).onComplete();
  6. verify(o, never()).onNext(any());
  7. verify(o, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatLimited() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. Observable.just(1).repeat(10).subscribe(o);
  5. verify(o, times(10)).onNext(1);
  6. verify(o).onComplete();
  7. verify(o, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatOne() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. Observable.just(1).repeat(1).subscribe(o);
  5. verify(o).onComplete();
  6. verify(o, times(1)).onNext(any());
  7. verify(o, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void repeatLongPredicateInvalid() {
  3. try {
  4. Observable.just(1).repeat(-99);
  5. fail("Should have thrown");
  6. } catch (IllegalArgumentException ex) {
  7. assertEquals("times >= 0 required but it was -99", ex.getMessage());
  8. }
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatAndTake() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. Observable.just(1).repeat().take(10).subscribe(o);
  5. verify(o, times(10)).onNext(1);
  6. verify(o).onComplete();
  7. verify(o, never()).onError(any(Throwable.class));
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void bufferWithSizeTake1() {
  3. Observable<Integer> source = Observable.just(1).repeat();
  4. Observable<List<Integer>> result = source.buffer(2).take(1);
  5. Observer<Object> o = TestHelper.mockObserver();
  6. result.subscribe(o);
  7. verify(o).onNext(Arrays.asList(1, 1));
  8. verify(o).onComplete();
  9. verify(o, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void bufferWithSizeSkipTake1() {
  3. Observable<Integer> source = Observable.just(1).repeat();
  4. Observable<List<Integer>> result = source.buffer(2, 3).take(1);
  5. Observer<Object> o = TestHelper.mockObserver();
  6. result.subscribe(o);
  7. verify(o).onNext(Arrays.asList(1, 1));
  8. verify(o).onComplete();
  9. verify(o, never()).onError(any(Throwable.class));
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatError() {
  3. Observer<Object> o = TestHelper.mockObserver();
  4. Observable.error(new TestException()).repeat(10).subscribe(o);
  5. verify(o).onError(any(TestException.class));
  6. verify(o, never()).onNext(any());
  7. verify(o, never()).onComplete();
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 20000)
  2. public void testNoStackOverFlow() {
  3. Observable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepeatTake() {
  3. Observable<Integer> xs = Observable.just(1, 2);
  4. Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  5. assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void noCancelPreviousRepeat() {
  3. final AtomicInteger counter = new AtomicInteger();
  4. Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
  5. @Override
  6. public void run() throws Exception {
  7. counter.getAndIncrement();
  8. }
  9. });
  10. source.repeat(5)
  11. .test()
  12. .assertResult(1, 1, 1, 1, 1);
  13. assertEquals(0, counter.get());
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(timeout = 2000)
  2. public void testRepetition() {
  3. int num = 10;
  4. final AtomicInteger count = new AtomicInteger();
  5. int value = Observable.unsafeCreate(new ObservableSource<Integer>() {
  6. @Override
  7. public void subscribe(final Observer<? super Integer> o) {
  8. o.onNext(count.incrementAndGet());
  9. o.onComplete();
  10. }
  11. }).repeat().subscribeOn(Schedulers.computation())
  12. .take(num).blockingLast();
  13. assertEquals(num, value);
  14. }

代码示例来源:origin: ReactiveX/RxJava

  1. /** Issue #2844: wrong target of request. */
  2. @Test(timeout = 3000)
  3. public void testRepeatRetarget() {
  4. final List<Integer> concatBase = new ArrayList<Integer>();
  5. TestObserver<Integer> to = new TestObserver<Integer>();
  6. Observable.just(1, 2)
  7. .repeat(5)
  8. .concatMap(new Function<Integer, Observable<Integer>>() {
  9. @Override
  10. public Observable<Integer> apply(Integer x) {
  11. System.out.println("testRepeatRetarget -> " + x);
  12. concatBase.add(x);
  13. return Observable.<Integer>empty()
  14. .delay(200, TimeUnit.MILLISECONDS);
  15. }
  16. })
  17. .subscribe(to);
  18. to.awaitTerminalEvent();
  19. to.assertNoErrors();
  20. to.assertNoValues();
  21. assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testRepeatTakeWithSubscribeOn() throws InterruptedException {
  3. final AtomicInteger counter = new AtomicInteger();
  4. Observable<Integer> oi = Observable.unsafeCreate(new ObservableSource<Integer>() {
  5. @Override
  6. public void subscribe(Observer<? super Integer> sub) {
  7. sub.onSubscribe(Disposables.empty());
  8. counter.incrementAndGet();
  9. sub.onNext(1);
  10. sub.onNext(2);
  11. sub.onComplete();
  12. }
  13. }).subscribeOn(Schedulers.newThread());
  14. Object[] ys = oi.repeat().subscribeOn(Schedulers.newThread()).map(new Function<Integer, Integer>() {
  15. @Override
  16. public Integer apply(Integer t1) {
  17. try {
  18. Thread.sleep(50);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. return t1;
  23. }
  24. }).take(4).toList().blockingGet().toArray();
  25. assertEquals(2, counter.get());
  26. assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
  27. }

代码示例来源:origin: ReactiveX/RxJava

  1. /** Issue #2587. */
  2. @Test
  3. public void testRepeatAndDistinctUnbounded() {
  4. Observable<Integer> src = Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
  5. .take(3)
  6. .repeat(3)
  7. .distinct();
  8. TestObserver<Integer> to = new TestObserver<Integer>();
  9. src.subscribe(to);
  10. to.assertNoErrors();
  11. to.assertTerminated();
  12. to.assertValues(1, 2, 3);
  13. }

代码示例来源:origin: diabolicallabs/vertx-cron

  1. public static Observable<Timed<Long>> cronspec(Scheduler scheduler, String cronspec, String timeZoneName) {
  2. if (timeZoneName != null) {
  3. Boolean noneMatch = Arrays.stream(TimeZone.getAvailableIDs()).noneMatch(available -> available.equals(timeZoneName));
  4. if (noneMatch) throw new IllegalArgumentException("timeZoneName " + timeZoneName + " is invalid");
  5. }
  6. return Observable.just(cronspec)
  7. .flatMap(_cronspec -> {
  8. CronExpression cron;
  9. try {
  10. cron = new CronExpression(_cronspec);
  11. if (timeZoneName != null) {
  12. cron.setTimeZone(TimeZone.getTimeZone(timeZoneName));
  13. }
  14. } catch (ParseException e) {
  15. throw new IllegalArgumentException("Invalid cronspec " + _cronspec, e);
  16. }
  17. return Observable.just(cron)
  18. .map(cronExpression -> cronExpression.getNextValidTimeAfter(new Date(new Date().getTime() + 500)))
  19. .map(nextRunDate -> nextRunDate.getTime() - new Date().getTime())
  20. .flatMap(delay -> Observable.timer(delay, TimeUnit.MILLISECONDS, scheduler))
  21. .timestamp()
  22. .repeat();
  23. });
  24. }

代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime

  1. Single<HttpResponse> pollUntilDone() {
  2. return sendPollRequestWithDelay()
  3. .repeat()
  4. .takeUntil(new Predicate<HttpResponse>() {
  5. @Override
  6. public boolean test(HttpResponse ignored) {
  7. return isDone();
  8. }
  9. })
  10. .lastOrError();
  11. }

代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime

  1. Observable<OperationStatus<Object>> pollUntilDoneWithStatusUpdates(final HttpRequest originalHttpRequest, final SwaggerMethodParser methodParser, final Type operationStatusResultType) {
  2. return sendPollRequestWithDelay()
  3. .flatMap(new Function<HttpResponse, Observable<OperationStatus<Object>>>() {
  4. @Override
  5. public Observable<OperationStatus<Object>> apply(HttpResponse httpResponse) {
  6. return createOperationStatusObservable(originalHttpRequest, httpResponse, methodParser, operationStatusResultType);
  7. }
  8. })
  9. .repeat()
  10. .takeUntil(new Predicate<OperationStatus<Object>>() {
  11. @Override
  12. public boolean test(OperationStatus<Object> operationStatus) {
  13. return isDone();
  14. }
  15. });
  16. }

相关文章

Observable类方法