io.reactivex.Flowable.subscribeOn()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(5.9k)|赞(0)|评价(0)|浏览(357)

本文整理了Java中io.reactivex.Flowable.subscribeOn()方法的一些代码示例,展示了Flowable.subscribeOn()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.subscribeOn()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:subscribeOn

Flowable.subscribeOn介绍

[英]Asynchronously subscribes Subscribers to this Publisher on the specified Scheduler.

If there is a #create(FlowableOnSubscribe,BackpressureStrategy) type source up in the chain, it is recommended to use subscribeOn(scheduler, false) instead to avoid same-pool deadlock because requests may pile up behind an eager/blocking emitter.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: You specify which Scheduler this operator will use.
[中]在指定的计划程序上异步订阅此发布服务器的订阅服务器。
如果链中有一个#create(FlowableOnSubscribe,BackPressureStragy)类型的源,建议使用subscribeOn(scheduler,false)来避免相同的池死锁,因为请求可能堆积在急切/阻塞发射器后面。
背压:操作员不会干扰由源发布者的背压行为确定的背压。计划程序:指定此操作员将使用的计划程序。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> apply(Flowable<Integer> f) throws Exception {
    return f.subscribeOn(Schedulers.single());
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> apply(Integer i) {
    return incrementingIntegers(new AtomicInteger())
        .take(10)
        .subscribeOn(Schedulers.computation());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer t1) {
    return composer(Flowable.range(t1 * 10, 2), subscriptionCount, m)
        .subscribeOn(Schedulers.computation());
  }
}, m);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer t1) {
    return composer(Flowable.range(t1 * 10, 2), subscriptionCount, m)
        .subscribeOn(Schedulers.computation());
  }
}, new BiFunction<Integer, Integer, Integer>() {

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer t) {
    return Flowable.just(1).subscribeOn(Schedulers.computation());
  }
}).subscribe(ts);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Integer> apply(Integer t) {
    return Flowable.range(1, 1000).subscribeOn(Schedulers.computation());
  }
}).observeOn(Schedulers.single())

代码示例来源:origin: ReactiveX/RxJava

@Override
public Flowable<String> apply(Integer id) {
  return Flowable.fromIterable(Arrays.asList("a-" + id, "b-" + id)).subscribeOn(getScheduler())
      .map(new Function<String, String>() {
        @Override
        public String apply(String s) {
          return "names=>" + s;
        }
      });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(/* timeout = 1000, */expected = RuntimeException.class)
public void testHasNextThrows() {
  TestScheduler scheduler = new TestScheduler();
  Flowable<Long> source = Flowable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  it.hasNext();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = RuntimeException.class)
public void testNextThrows() {
  TestScheduler scheduler = new TestScheduler();
  Flowable<Long> source = Flowable.<Long> error(new RuntimeException("Forced failure!")).subscribeOn(scheduler);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  it.next();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Integer v) throws Exception {
    return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  }
}, false, 3)

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Integer v) throws Exception {
    return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  }
}).toFlowable()

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Integer v) throws Exception {
    return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  }
}, false, 3).toFlowable()

代码示例来源:origin: ReactiveX/RxJava

@Override
  public CompletableSource apply(Integer v) throws Exception {
    return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements();
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirstDefault() {
  assertEquals(1, Flowable.<Integer>empty()
      .subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void blockingFirst() {
  assertEquals(1, Flowable.range(1, 10)
      .subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Flowable<Integer> xs = Flowable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void inputOutputSubscribeRace2() {
  Flowable<Integer> source = Flowable.just(1).subscribeOn(Schedulers.single())
      .publish(Functions.<Flowable<Integer>>identity());
  for (int i = 0; i < 500; i++) {
    source.test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void longSequenceEquals() {
  Flowable<Integer> source = Flowable.range(1, Flowable.bufferSize() * 4).subscribeOn(Schedulers.computation());
  Flowable.sequenceEqual(source, source)
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(true);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testRaceConditions() {
  Scheduler comp = Schedulers.computation();
  Scheduler limited = comp.when(new Function<Flowable<Flowable<Completable>>, Completable>() {
    @Override
    public Completable apply(Flowable<Flowable<Completable>> t) {
      return Completable.merge(Flowable.merge(t, 10));
    }
  });
  merge(just(just(1).subscribeOn(limited).observeOn(comp)).repeat(1000)).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void repeatScheduled() {
  TestSubscriber<Integer> ts = TestSubscriber.create();
  Flowable.just(1).subscribeOn(Schedulers.computation()).repeat(5).subscribe(ts);
  ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
  ts.assertValues(1, 1, 1, 1, 1);
  ts.assertNoErrors();
  ts.assertComplete();
}

相关文章

Flowable类方法