io.reactivex.Flowable.onBackpressureLatest()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.7k)|赞(0)|评价(0)|浏览(251)

本文整理了Java中io.reactivex.Flowable.onBackpressureLatest()方法的一些代码示例,展示了Flowable.onBackpressureLatest()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.onBackpressureLatest()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:onBackpressureLatest

Flowable.onBackpressureLatest介绍

[英]Instructs a Publisher that is emitting items faster than its Subscriber can consume them to hold onto the latest value and emit that on request.

Its behavior is logically equivalent to blockingLatest() with the exception that the downstream is not blocking while requesting more values.

Note that if the upstream Publisher does support backpressure, this operator ignores that capability and doesn't propagate any backpressure requests from downstream.

Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn, requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events. Backpressure: The operator honors backpressure from downstream and consumes the source Publisher in an unbounded manner (i.e., not applying backpressure to it). Scheduler: onBackpressureLatest does not operate by default on a particular Scheduler.
[中]指示发送项目的速度快于其订阅者使用项目的速度的发布者保留最新值,并根据请求发送该值。
其行为在逻辑上等同于blockingLatest(),但下游在请求更多值时没有阻塞。
请注意,如果上游发布服务器确实支持背压,则此操作符将忽略该功能,并且不会传播来自下游的任何背压请求。
请注意,由于背压请求是如何通过subscribeOn/observeOn传播的,因此从下游请求1个以上的请求不能保证onNext事件的连续传递。背压:操作员接受来自下游的背压,并以无限制的方式使用源发布服务器(即,不向其应用背压)。调度程序:onBackpressureLatest默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Object> apply(Flowable<Object> f) throws Exception {
    return f.onBackpressureLatest();
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
public Publisher<Long> createPublisher(final long elements) {
  return
      Flowable.timer(1, TimeUnit.MILLISECONDS)
      .onBackpressureLatest()
    ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void badRequest() {
    TestHelper.assertBadRequestReported(Flowable.never().onBackpressureLatest());
  }
}

代码示例来源:origin: TeamNewPipe/NewPipe

@Override
public void startLoading(boolean forceLoad) {
  super.startLoading(forceLoad);
  if (disposables != null) disposables.clear();
  disposables.add(getDebouncedSaver());
  isLoadingComplete.set(false);
  isModified.set(false);
  playlistManager.getPlaylistStreams(playlistId)
      .onBackpressureLatest()
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(getPlaylistObserver());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.never().onBackpressureLatest());
}

代码示例来源:origin: TeamNewPipe/NewPipe

@Override
public void startLoading(boolean forceLoad) {
  super.startLoading(forceLoad);
  Flowable.combineLatest(
      localPlaylistManager.getPlaylists(),
      remotePlaylistManager.getPlaylists(),
      BookmarkFragment::merge
  ).onBackpressureLatest()
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(getPlaylistsSubscriber());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Flowable.just(1, 2)
  .onBackpressureLatest()
  .take(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Publisher<List<Long>> createPublisher(long elements) {
  return
    Flowable.fromIterable(iterate(elements))
    .buffer(Flowable.just(1).concatWith(Flowable.<Integer>never()))
    .onBackpressureLatest()
  ;
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAsynchronousDrop() throws InterruptedException {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>(1L) {
    final Random rnd = new Random();
    @Override
    public void onNext(Integer t) {
      super.onNext(t);
      if (rnd.nextDouble() < 0.001) {
        try {
          Thread.sleep(1);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
      }
      request(1);
    }
  };
  int m = 100000;
  Flowable.range(1, m)
  .subscribeOn(Schedulers.computation())
  .onBackpressureLatest()
  .observeOn(Schedulers.io())
  .subscribe(ts);
  ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
  ts.assertTerminated();
  int n = ts.values().size();
  System.out.println("testAsynchronousDrop -> " + n);
  Assert.assertTrue("All events received?", n < m);
}

代码示例来源:origin: ReactiveX/RxJava

return f.onBackpressureDrop();
case LATEST:
  return f.onBackpressureLatest();
case MISSING:
  return f;

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSimple() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 5).onBackpressureLatest().subscribe(ts);
  ts.assertNoErrors();
  ts.assertTerminated();
  ts.assertValues(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSimpleBackpressure() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>(2L);
  Flowable.range(1, 5).onBackpressureLatest().subscribe(ts);
  ts.assertNoErrors();
  ts.assertValues(1, 2);
  ts.assertNotComplete();
}

代码示例来源:origin: TeamNewPipe/NewPipe

.onBackpressureLatest()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getPlaylistBookmarkSubscriber());

代码示例来源:origin: redisson/redisson

return o.onBackpressureDrop();
case LATEST:
  return o.onBackpressureLatest();
case MISSING:
  return o;

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSimpleError() {
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new TestException()))
  .onBackpressureLatest().subscribe(ts);
  ts.assertTerminated();
  ts.assertError(TestException.class);
  ts.assertValues(1, 2, 3, 4, 5);
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) throws IOException, InterruptedException {
    Flowable<Long> flowA = Flowable.interval(100, TimeUnit.MILLISECONDS);
    Flowable<Long> flowB = Flowable.interval(200, TimeUnit.MILLISECONDS);

    Flowable<LongCouple> combined = Flowable.combineLatest(Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
        a -> new LongCouple((Long) a[0], (Long) a[1]),
        1)
        .onBackpressureLatest()
        .observeOn(Schedulers.newThread(), false, 1);
    combined.subscribe((longCouple -> {
      System.out.println(longCouple.aLong + ":" + longCouple.bLong);
      Thread.sleep(1000);
    }));
    Thread.sleep(10000000);
  }
}

代码示例来源:origin: org.apache.camel/camel-rxjava2

synchronized void attach(ReactiveStreamsProducer producer) {
  Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
  if (this.camelProducer != null) {
    throw new IllegalStateException("A producer is already attached to the stream '" + name + "'");
  }
  if (this.camelProducer != producer) {
    detach();
    ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
    Flowable<Exchange> flow = Flowable.create(camelEmitter::set, BackpressureStrategy.MISSING);
    if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
      flow.onBackpressureDrop(this::onBackPressure)
        .doAfterNext(this::onItemEmitted)
        .subscribe(this.publisher);
    } else if (ObjectHelper.equal(strategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
      flow.doAfterNext(this::onItemEmitted)
        .onBackpressureLatest()
        .subscribe(this.publisher);
    } else {
      flow.doAfterNext(this::onItemEmitted)
        .onBackpressureBuffer()
        .subscribe(this.publisher);
    }
    camelProducer = producer;
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test() throws Exception {
    for (int i = 0; i < 1000; i++) {
      TestSubscriber<Integer> ts = new TestSubscriber<>();
      ts.onSubscribe(new BooleanSubscription());
      Flowable.range(1, 100)
      .onBackpressureLatest()
      .observeOn(Schedulers.io(), false, 1)
      .subscribe(v -> {
        Thread.sleep(100);
        ts.onNext(v);
      }, ts::onError, ts::onComplete);
      
      ts.awaitDone(5, TimeUnit.SECONDS)
      .assertResult(1, 100);
    }
    Thread.sleep(1000);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) {
  Flowable<Integer> obs = Flowable.create(emitter -> {
   for (int i = 1; i < 1000; i++) {
    if (i % 5 == 0) {
     sleep(300L);
    }

    emitter.onNext(i);
   }

   emitter.onComplete();
  }, BackpressureStrategy.MISSING);

  obs.subscribeOn(Schedulers.computation())
    .onBackpressureLatest()
    .observeOn(Schedulers.computation(), false, 64)
    .blockingSubscribe(new Consumer<Integer>() {
      int i;
      @Override
      public void accept(Integer value) throws Exception {
        System.out.println((++i) + " Received " + value);
      }
    }); // Why does this get stuck at "Received 128"

//    sleep(10000L);
 }

相关文章

Flowable类方法