io.reactivex.Flowable.limit()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.8k)|赞(0)|评价(0)|浏览(222)

本文整理了Java中io.reactivex.Flowable.limit()方法的一些代码示例,展示了Flowable.limit()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.limit()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:limit

Flowable.limit介绍

[英]Limits both the number of upstream items (after which the sequence completes) and the total downstream request amount requested from the upstream to possibly prevent the creation of excess items by the upstream.

The operator requests at most the given count of items from upstream even if the downstream requests more than that. For example, given a limit(5), if the downstream requests 1, a request of 1 is submitted to the upstream and the operator remembers that only 4 items can be requested now on. A request of 5 at this point will request 4 from the upstream and any subsequent requests will be ignored.

Note that requests are negotiated on an operator boundary and limit's amount may not be preserved further upstream. For example, source.observeOn(Schedulers.computation()).limit(5) will still request the default (128) elements from the given source.

The main use of this operator is with sources that are async boundaries that don't interfere with request amounts, such as certain Flowable-based network endpoints that relay downstream request amounts unchanged and are, therefore, prone to trigger excessive item creation/transmission over the network. Backpressure: The operator requests a total of the given count items from the upstream. Scheduler: limit does not operate by default on a particular Scheduler.
[中]限制上游项目的数量(序列完成后)和从上游请求的总下游请求量,以可能防止上游创建多余的项目。
操作员最多从上游请求给定数量的项目,即使下游请求超过该数量。例如,给定一个限制(5),如果下游请求1,则向上游提交一个请求1,并且操作员记住现在只能请求4个项目。此时5的请求将从上游请求4,任何后续请求都将被忽略。
请注意,请求是在运营商边界上协商的,限制的金额可能不会进一步保留在上游。例如,来源。observeOn(Schedulers.computation())。limit(5)仍将从给定源请求默认(128)元素。
此运算符的主要用途是用于不干扰请求量的异步边界源,例如某些基于流的网络端点,这些端点中继下游请求量不变,因此容易通过网络触发过多的项创建/传输。背压:操作员从上游请求给定计数项目的总数。调度程序:默认情况下,限制不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements * 2).limit(elements)
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void badRequest() {
  TestHelper.assertBadRequestReported(Flowable.range(1, 5).limit(3));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void longerSequence() {
  Flowable.range(1, 6)
  .doOnRequest(this)
  .limit(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(5, requests.get(0).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shorterSequence() {
  Flowable.range(1, 5)
  .doOnRequest(this)
  .limit(6)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(6, requests.get(0).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void exactSequence() {
  Flowable.range(1, 5)
  .doOnRequest(this)
  .doOnCancel(this)
  .limit(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(2, requests.size());
  assertEquals(5, requests.get(0).intValue());
  assertEquals(CANCELLED, requests.get(1));
}

代码示例来源:origin: TeamNewPipe/NewPipe

protected void setupNotification() {
  notificationManager = NotificationManagerCompat.from(this);
  notificationBuilder = createNotification();
  startForeground(getNotificationId(), notificationBuilder.build());
  final Function<Flowable<String>, Publisher<String>> throttleAfterFirstEmission = flow -> flow.limit(1)
      .concatWith(flow.skip(1).throttleLast(NOTIFICATION_SAMPLING_PERIOD, TimeUnit.MILLISECONDS));
  disposables.add(notificationUpdater
      .filter(s -> !s.isEmpty())
      .publish(throttleAfterFirstEmission)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(this::updateNotification));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitZero() {
  Flowable.range(1, 5)
  .doOnCancel(this)
  .doOnRequest(this)
  .limit(0)
  .test()
  .assertResult();
  assertEquals(1, requests.size());
  assertEquals(CANCELLED, requests.get(0));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .limit(5)
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limit() {
  Flowable.range(1, 5)
  .concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v)
        throws Exception {
      return Single.just(v);
    }
  })
  .limit(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitAndTake() {
  Flowable.range(1, 5)
  .doOnCancel(this)
  .doOnRequest(this)
  .limit(6)
  .take(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(6L, CANCELLED), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limit() {
  Flowable.range(1, 5)
  .switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  })
  .limit(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limit() {
  Flowable.range(1, 5)
  .switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
    @Override
    public SingleSource<Integer> apply(Integer v)
        throws Exception {
      return Single.just(v);
    }
  })
  .limit(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limit() {
  Flowable.range(1, 5)
  .concatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
    @Override
    public MaybeSource<Integer> apply(Integer v)
        throws Exception {
      return Maybe.just(v);
    }
  })
  .limit(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void requestRace() {
  for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
    final TestSubscriber<Integer> ts = Flowable.range(1, 10)
        .limit(5)
        .test(0L);
    Runnable r = new Runnable() {
      @Override
      public void run() {
        ts.request(3);
      }
    };
    TestHelper.race(r, r);
    ts.assertResult(1, 2, 3, 4, 5);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void limitStep() {
  TestSubscriber<Integer> ts = Flowable.range(1, 6)
  .doOnRequest(this)
  .limit(5)
  .test(0L);
  assertEquals(0, requests.size());
  ts.request(1);
  ts.assertValue(1);
  ts.request(2);
  ts.assertValues(1, 2, 3);
  ts.request(3);
  ts.assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(1L, 2L, 2L), requests);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void errorAfterLimitReached() {
    List<Throwable> errors = TestHelper.trackPluginErrors();
    try {
      Flowable.error(new TestException())
      .limit(0)
      .test()
      .assertResult();

      TestHelper.assertUndeliverable(errors, 0, TestException.class);
    } finally {
      RxJavaPlugins.reset();
    }
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noOverrequest() {
  PublishProcessor<Integer> pp = PublishProcessor.create();
  TestSubscriber<Integer> ts = pp
      .doOnRequest(this)
      .limit(5)
      .test(0L);
  ts.request(5);
  ts.request(10);
  assertTrue(pp.offer(1));
  pp.onComplete();
  ts.assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void drainExactRequestCancel() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final SingleSubject<Integer> cs = SingleSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs)
      .limit(2)
      .subscribeWith(new TestSubscriber<Integer>(2) {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onComplete();
  ts.request(2);
  ts.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void drainExactRequestCancel() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs)
      .limit(2)
      .subscribeWith(new TestSubscriber<Integer>(2) {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onComplete();
  ts.request(2);
  ts.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void openCloseLimit() {
  PublishProcessor<Integer> source = PublishProcessor.create();
  PublishProcessor<Integer> openIndicator = PublishProcessor.create();
  PublishProcessor<Integer> closeIndicator = PublishProcessor.create();
  TestSubscriber<List<Integer>> ts = source
  .buffer(openIndicator, Functions.justFunction(closeIndicator))
  .limit(1)
  .test(2);
  openIndicator.onNext(1);
  closeIndicator.onComplete();
  assertFalse(source.hasSubscribers());
  assertFalse(openIndicator.hasSubscribers());
  assertFalse(closeIndicator.hasSubscribers());
  ts.assertResult(Collections.<Integer>emptyList());
}

相关文章

Flowable类方法