io.reactivex.Flowable.subscribeWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.6k)|赞(0)|评价(0)|浏览(300)

本文整理了Java中io.reactivex.Flowable.subscribeWith()方法的一些代码示例,展示了Flowable.subscribeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribeWith()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribeWith

Flowable.subscribeWith介绍

[英]Subscribes a given Subscriber (subclass) to this Flowable and returns the given Subscriber as is.

Usage example:

  1. Flowable<Integer> source = Flowable.range(1, 10);
  2. CompositeDisposable composite = new CompositeDisposable();
  3. ResourceSubscriber<Integer> rs = new ResourceSubscriber<>() {
  4. // ...
  5. };
  6. composite.add(source.subscribeWith(rs));

Backpressure: The backpressure behavior/expectation is determined by the supplied Subscriber. Scheduler: subscribeWith does not operate by default on a particular Scheduler.
[中]将给定订阅者(子类)订阅到此Flowable并按原样返回给定订阅者。
用法示例:

  1. Flowable<Integer> source = Flowable.range(1, 10);
  2. CompositeDisposable composite = new CompositeDisposable();
  3. ResourceSubscriber<Integer> rs = new ResourceSubscriber<>() {
  4. // ...
  5. };
  6. composite.add(source.subscribeWith(rs));

背压:背压行为/预期由提供的订户决定。调度程序:默认情况下,SubscribeWidth不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void range() {
  3. Flowable.range(1, 5)
  4. .doAfterNext(afterNext)
  5. .subscribeWith(ts)
  6. .assertResult(1, 2, 3, 4, 5);
  7. assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void empty() {
  3. Flowable.<Integer>empty()
  4. .doAfterNext(afterNext)
  5. .subscribeWith(ts)
  6. .assertResult();
  7. assertTrue(values.isEmpty());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void just() {
  3. Flowable.just(1)
  4. .doAfterNext(afterNext)
  5. .subscribeWith(ts)
  6. .assertResult(1);
  7. assertEquals(Arrays.asList(1, -1), values);
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void withFlowable() {
  3. Flowable.range(1, 10)
  4. .subscribeWith(new TestSubscriber<Integer>())
  5. .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void error() {
  3. Flowable.<Integer>error(new TestException())
  4. .doAfterNext(afterNext)
  5. .subscribeWith(ts)
  6. .assertFailure(TestException.class);
  7. assertTrue(values.isEmpty());
  8. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void rangeConditional() {
  3. Flowable.range(1, 5)
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(ts)
  7. .assertResult(1, 2, 3, 4, 5);
  8. assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void emptyConditional() {
  3. Flowable.<Integer>empty()
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(ts)
  7. .assertResult();
  8. assertTrue(values.isEmpty());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void justConditional() {
  3. Flowable.just(1)
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(ts)
  7. .assertResult(1);
  8. assertEquals(Arrays.asList(1, -1), values);
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void errorConditional() {
  3. Flowable.<Integer>error(new TestException())
  4. .doAfterNext(afterNext)
  5. .filter(Functions.alwaysTrue())
  6. .subscribeWith(ts)
  7. .assertFailure(TestException.class);
  8. assertTrue(values.isEmpty());
  9. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fusionCrash() {
  3. MulticastProcessor<Integer> mp = Flowable.range(1, 5)
  4. .map(new Function<Integer, Integer>() {
  5. @Override
  6. public Integer apply(Integer v) throws Exception {
  7. throw new IOException();
  8. }
  9. })
  10. .subscribeWith(MulticastProcessor.<Integer>create());
  11. mp.test().assertFailure(IOException.class);
  12. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void conditionalSlowPathCancel() {
  3. Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 })
  4. .filter(Functions.alwaysTrue())
  5. .subscribeWith(new TestSubscriber<Integer>(5L) {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. cancel();
  11. onComplete();
  12. }
  13. }
  14. })
  15. .assertResult(1);
  16. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposeInOnNext() {
  3. final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. BehaviorProcessor.createDefault(1)
  5. .debounce(new Function<Integer, Flowable<Object>>() {
  6. @Override
  7. public Flowable<Object> apply(Integer o) throws Exception {
  8. ts.cancel();
  9. return Flowable.never();
  10. }
  11. })
  12. .subscribeWith(ts)
  13. .assertEmpty();
  14. assertTrue(ts.isDisposed());
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void disposedInOnComplete() {
  3. final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  4. new Flowable<Integer>() {
  5. @Override
  6. protected void subscribeActual(Subscriber<? super Integer> subscriber) {
  7. subscriber.onSubscribe(new BooleanSubscription());
  8. ts.cancel();
  9. subscriber.onComplete();
  10. }
  11. }
  12. .debounce(Functions.justFunction(Flowable.never()))
  13. .subscribeWith(ts)
  14. .assertEmpty();
  15. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onSuccessSlowPath() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final SingleSubject<Integer> cs = SingleSubject.create();
  5. TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. cs.onSuccess(2);
  11. }
  12. }
  13. });
  14. pp.onNext(1);
  15. pp.onNext(3);
  16. pp.onComplete();
  17. ts.assertResult(1, 2, 3);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void nextWindowMissingBackpressureDrainOnSize() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. TestSubscriber<Flowable<Integer>> ts = pp.window(1, TimeUnit.MINUTES, 1)
  5. .subscribeWith(new TestSubscriber<Flowable<Integer>>(2) {
  6. int calls;
  7. @Override
  8. public void onNext(Flowable<Integer> t) {
  9. super.onNext(t);
  10. if (++calls == 2) {
  11. pp.onNext(2);
  12. }
  13. }
  14. });
  15. pp.onNext(1);
  16. ts.assertValueCount(2)
  17. .assertError(MissingBackpressureException.class)
  18. .assertNotComplete();
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onSuccessSlowPath() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. cs.onSuccess(2);
  11. }
  12. }
  13. });
  14. pp.onNext(1);
  15. pp.onNext(3);
  16. pp.onComplete();
  17. ts.assertResult(1, 2, 3);
  18. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void drainExactRequestCancel() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final SingleSubject<Integer> cs = SingleSubject.create();
  5. TestSubscriber<Integer> ts = pp.mergeWith(cs)
  6. .limit(2)
  7. .subscribeWith(new TestSubscriber<Integer>(2) {
  8. @Override
  9. public void onNext(Integer t) {
  10. super.onNext(t);
  11. if (t == 1) {
  12. cs.onSuccess(2);
  13. }
  14. }
  15. });
  16. pp.onNext(1);
  17. pp.onComplete();
  18. ts.request(2);
  19. ts.assertResult(1, 2);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void drainExactRequestCancel() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestSubscriber<Integer> ts = pp.mergeWith(cs)
  6. .limit(2)
  7. .subscribeWith(new TestSubscriber<Integer>(2) {
  8. @Override
  9. public void onNext(Integer t) {
  10. super.onNext(t);
  11. if (t == 1) {
  12. cs.onSuccess(2);
  13. }
  14. }
  15. });
  16. pp.onNext(1);
  17. pp.onComplete();
  18. ts.request(2);
  19. ts.assertResult(1, 2);
  20. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onSuccessSlowPathBackpressured() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>(1) {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. cs.onSuccess(2);
  11. }
  12. }
  13. });
  14. pp.onNext(1);
  15. pp.onNext(3);
  16. pp.onComplete();
  17. ts.request(2);
  18. ts.assertResult(1, 2, 3);
  19. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void onNextSlowPath() {
  3. final PublishProcessor<Integer> pp = PublishProcessor.create();
  4. final MaybeSubject<Integer> cs = MaybeSubject.create();
  5. TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
  6. @Override
  7. public void onNext(Integer t) {
  8. super.onNext(t);
  9. if (t == 1) {
  10. pp.onNext(2);
  11. }
  12. }
  13. });
  14. pp.onNext(1);
  15. cs.onSuccess(3);
  16. pp.onNext(4);
  17. pp.onComplete();
  18. ts.assertResult(1, 2, 3, 4);
  19. }

相关文章

Flowable类方法