io.reactivex.Flowable.subscribeWith()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.6k)|赞(0)|评价(0)|浏览(262)

本文整理了Java中io.reactivex.Flowable.subscribeWith()方法的一些代码示例,展示了Flowable.subscribeWith()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribeWith()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribeWith

Flowable.subscribeWith介绍

[英]Subscribes a given Subscriber (subclass) to this Flowable and returns the given Subscriber as is.

Usage example:

Flowable<Integer> source = Flowable.range(1, 10); 
CompositeDisposable composite = new CompositeDisposable(); 
ResourceSubscriber<Integer> rs = new ResourceSubscriber<>() { 
// ... 
}; 
composite.add(source.subscribeWith(rs));

Backpressure: The backpressure behavior/expectation is determined by the supplied Subscriber. Scheduler: subscribeWith does not operate by default on a particular Scheduler.
[中]将给定订阅者(子类)订阅到此Flowable并按原样返回给定订阅者。
用法示例:

Flowable<Integer> source = Flowable.range(1, 10); 
CompositeDisposable composite = new CompositeDisposable(); 
ResourceSubscriber<Integer> rs = new ResourceSubscriber<>() { 
// ... 
}; 
composite.add(source.subscribeWith(rs));

背压:背压行为/预期由提供的订户决定。调度程序:默认情况下,SubscribeWidth不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test
public void range() {
  Flowable.range(1, 5)
  .doAfterNext(afterNext)
  .subscribeWith(ts)
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void empty() {
  Flowable.<Integer>empty()
  .doAfterNext(afterNext)
  .subscribeWith(ts)
  .assertResult();
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void just() {
  Flowable.just(1)
  .doAfterNext(afterNext)
  .subscribeWith(ts)
  .assertResult(1);
  assertEquals(Arrays.asList(1, -1), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withFlowable() {
  Flowable.range(1, 10)
  .subscribeWith(new TestSubscriber<Integer>())
  .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.<Integer>error(new TestException())
  .doAfterNext(afterNext)
  .subscribeWith(ts)
  .assertFailure(TestException.class);
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void rangeConditional() {
  Flowable.range(1, 5)
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(ts)
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void emptyConditional() {
  Flowable.<Integer>empty()
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(ts)
  .assertResult();
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void justConditional() {
  Flowable.just(1)
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(ts)
  .assertResult(1);
  assertEquals(Arrays.asList(1, -1), values);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void errorConditional() {
  Flowable.<Integer>error(new TestException())
  .doAfterNext(afterNext)
  .filter(Functions.alwaysTrue())
  .subscribeWith(ts)
  .assertFailure(TestException.class);
  assertTrue(values.isEmpty());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fusionCrash() {
  MulticastProcessor<Integer> mp = Flowable.range(1, 5)
  .map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer v) throws Exception {
      throw new IOException();
    }
  })
  .subscribeWith(MulticastProcessor.<Integer>create());
  mp.test().assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void conditionalSlowPathCancel() {
  Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5 })
  .filter(Functions.alwaysTrue())
  .subscribeWith(new TestSubscriber<Integer>(5L) {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cancel();
        onComplete();
      }
    }
  })
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposeInOnNext() {
  final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  BehaviorProcessor.createDefault(1)
  .debounce(new Function<Integer, Flowable<Object>>() {
    @Override
    public Flowable<Object> apply(Integer o) throws Exception {
      ts.cancel();
      return Flowable.never();
    }
  })
  .subscribeWith(ts)
  .assertEmpty();
  assertTrue(ts.isDisposed());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void disposedInOnComplete() {
  final TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  new Flowable<Integer>() {
    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
      subscriber.onSubscribe(new BooleanSubscription());
      ts.cancel();
      subscriber.onComplete();
    }
  }
  .debounce(Functions.justFunction(Flowable.never()))
  .subscribeWith(ts)
  .assertEmpty();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onSuccessSlowPath() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final SingleSubject<Integer> cs = SingleSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onNext(3);
  pp.onComplete();
  ts.assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nextWindowMissingBackpressureDrainOnSize() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  TestSubscriber<Flowable<Integer>> ts = pp.window(1, TimeUnit.MINUTES, 1)
  .subscribeWith(new TestSubscriber<Flowable<Integer>>(2) {
    int calls;
    @Override
    public void onNext(Flowable<Integer> t) {
      super.onNext(t);
      if (++calls == 2) {
        pp.onNext(2);
      }
    }
  });
  pp.onNext(1);
  ts.assertValueCount(2)
  .assertError(MissingBackpressureException.class)
  .assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onSuccessSlowPath() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onNext(3);
  pp.onComplete();
  ts.assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void drainExactRequestCancel() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final SingleSubject<Integer> cs = SingleSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs)
      .limit(2)
      .subscribeWith(new TestSubscriber<Integer>(2) {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onComplete();
  ts.request(2);
  ts.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void drainExactRequestCancel() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs)
      .limit(2)
      .subscribeWith(new TestSubscriber<Integer>(2) {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onComplete();
  ts.request(2);
  ts.assertResult(1, 2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onSuccessSlowPathBackpressured() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>(1) {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        cs.onSuccess(2);
      }
    }
  });
  pp.onNext(1);
  pp.onNext(3);
  pp.onComplete();
  ts.request(2);
  ts.assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onNextSlowPath() {
  final PublishProcessor<Integer> pp = PublishProcessor.create();
  final MaybeSubject<Integer> cs = MaybeSubject.create();
  TestSubscriber<Integer> ts = pp.mergeWith(cs).subscribeWith(new TestSubscriber<Integer>() {
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (t == 1) {
        pp.onNext(2);
      }
    }
  });
  pp.onNext(1);
  cs.onSuccess(3);
  pp.onNext(4);
  pp.onComplete();
  ts.assertResult(1, 2, 3, 4);
}

相关文章

Flowable类方法