io.reactivex.Flowable.withLatestFrom()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(133)

本文整理了Java中io.reactivex.Flowable.withLatestFrom()方法的一些代码示例,展示了Flowable.withLatestFrom()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.withLatestFrom()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:withLatestFrom

Flowable.withLatestFrom介绍

[英]Combines the value emission from this Publisher with the latest emissions from the other Publishers via a function to produce the output item.

Note that this operator doesn't emit anything until all other sources have produced at least one value. The resulting emission only happens when this Publisher emits (and not when any of the other sources emit, unlike combineLatest). If a source doesn't produce any value and just completes, the sequence is completed immediately. Backpressure: This operator is a pass-through for backpressure behavior between the source Publisherand the downstream Subscriber. The other Publishers are consumed in an unbounded manner. Scheduler: This operator does not operate by default on a particular Scheduler.
[中]通过一个函数将此发布者的值发布与其他发布者的最新发布相结合,以生成输出项。
请注意,在所有其他源生成至少一个值之前,此运算符不会发出任何内容。生成的发射仅在该发布服务器发射时发生(与CombineTest不同,在任何其他源发射时不发生)。如果一个源没有产生任何值,只是完成了,那么序列将立即完成。背压:此运算符是源发布服务器和下游订阅服务器之间背压行为的传递。其他出版商以无限的方式被消费。调度器:默认情况下,此操作员不会对特定的调度器进行操作。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void withLatestFromOtherNull() {
  just1.withLatestFrom(null, new BiFunction<Integer, Object, Object>() {
    @Override
    public Object apply(Integer a, Object b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void withLatestFromCombinerNull() {
  just1.withLatestFrom(just1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void withLatestFromCombinerReturnsNull() {
  just1.withLatestFrom(just1, new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements)
        .withLatestFrom(Flowable.just(1), new BiFunction<Integer, Integer, Integer>() {
          @Override
          public Integer apply(Integer a, Integer b) throws Exception {
            return a + b;
          }
        })
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void combineToNull2() {
  Flowable.just(1)
  .withLatestFrom(Arrays.asList(Flowable.just(2), Flowable.just(3)), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] o) throws Exception {
      return null;
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Flowable.just(1).withLatestFrom(Flowable.just(2), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return a;
    }
  }));
  TestHelper.checkDisposed(Flowable.just(1).withLatestFrom(Flowable.just(2), Flowable.just(3), new Function3<Integer, Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b, Integer c) throws Exception {
      return a;
    }
  }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void coldSourceConsumedWithoutOther() {
  Flowable.range(1, 10).withLatestFrom(Flowable.never(),
  new BiFunction<Integer, Object, Object>() {
    @Override
    public Object apply(Integer a, Object b) throws Exception {
      return a;
    }
  })
  .test(1)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void combineToNull1() {
  Flowable.just(1)
  .withLatestFrom(Flowable.just(2), new BiFunction<Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b) throws Exception {
      return null;
    }
  })
  .test()
  .assertFailure(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void zeroOtherCombinerReturnsNull() {
  Flowable.just(1)
  .withLatestFrom(new Flowable[0], Functions.justFunction(null))
  .test()
  .assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void with4Others() {
  Flowable<Integer> just = Flowable.just(1);
  TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
  just.withLatestFrom(just, just, just, just, new Function5<Integer, Integer, Integer, Integer, Integer, List<Integer>>() {
    @Override
    public List<Integer> apply(Integer a, Integer b, Integer c, Integer d, Integer e) {
      return Arrays.asList(a, b, c, d, e);
    }
  })
  .subscribe(ts);
  ts.assertValue(Arrays.asList(1, 1, 1, 1, 1));
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void with3Others() {
  Flowable<Integer> just = Flowable.just(1);
  TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
  just.withLatestFrom(just, just, just, new Function4<Integer, Integer, Integer, Integer, List<Integer>>() {
    @Override
    public List<Integer> apply(Integer a, Integer b, Integer c, Integer d) {
      return Arrays.asList(a, b, c, d);
    }
  })
  .subscribe(ts);
  ts.assertValue(Arrays.asList(1, 1, 1, 1));
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void manyIteratorThrows() {
  Flowable.just(1)
  .withLatestFrom(new CrashingMappedIterable<Flowable<Integer>>(1, 100, 100, new Function<Integer, Flowable<Integer>>() {
    @Override
    public Flowable<Integer> apply(Integer v) throws Exception {
      return Flowable.just(2);
    }
  }), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return a;
    }
  })
  .test()
  .assertFailureAndMessage(TestException.class, "iterator()");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void with2Others() {
  Flowable<Integer> just = Flowable.just(1);
  TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
  just.withLatestFrom(just, just, new Function3<Integer, Integer, Integer, List<Integer>>() {
    @Override
    public List<Integer> apply(Integer a, Integer b, Integer c) {
      return Arrays.asList(a, b, c);
    }
  })
  .subscribe(ts);
  ts.assertValue(Arrays.asList(1, 1, 1));
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void manyCombinerThrows() {
  Flowable.just(1).withLatestFrom(Flowable.just(2), Flowable.just(3), new Function3<Integer, Integer, Integer, Object>() {
    @Override
    public Object apply(Integer a, Integer b, Integer c) throws Exception {
      throw new TestException();
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void coldSourceConsumedWithoutManyOthers() {
  Flowable.range(1, 10).withLatestFrom(Flowable.never(), Flowable.never(), Flowable.never(),
  new Function4<Integer, Object, Object, Object, Object>() {
    @Override
    public Object apply(Integer a, Object b, Object c, Object d) throws Exception {
      return a;
    }
  })
  .test(1)
  .assertResult();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withEmpty() {
  TestSubscriber<String> ts = new TestSubscriber<String>(0);
  Flowable.range(1, 3).withLatestFrom(
      new Flowable<?>[] { Flowable.just(1), Flowable.empty() }, toArray)
  .subscribe(ts);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withError() {
  TestSubscriber<String> ts = new TestSubscriber<String>(0);
  Flowable.range(1, 3).withLatestFrom(
      new Flowable<?>[] { Flowable.just(1), Flowable.error(new TestException()) }, toArray)
  .subscribe(ts);
  ts.assertNoValues();
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void withMainError() {
  TestSubscriber<String> ts = new TestSubscriber<String>(0);
  Flowable.error(new TestException()).withLatestFrom(
      new Flowable<?>[] { Flowable.just(1), Flowable.just(1) }, toArray)
  .subscribe(ts);
  ts.assertNoValues();
  ts.assertError(TestException.class);
  ts.assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backpressureNoSignal() {
  PublishProcessor<String> pp1 = PublishProcessor.create();
  PublishProcessor<String> pp2 = PublishProcessor.create();
  TestSubscriber<String> ts = new TestSubscriber<String>(0);
  Flowable.range(1, 10).withLatestFrom(new Flowable<?>[] { pp1, pp2 }, toArray)
  .subscribe(ts);
  ts.assertNoValues();
  ts.request(1);
  ts.assertNoValues();
  ts.assertNoErrors();
  ts.assertComplete();
  assertFalse("ps1 has subscribers?", pp1.hasSubscribers());
  assertFalse("ps2 has subscribers?", pp2.hasSubscribers());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void backpressureWithSignal() {
  PublishProcessor<String> pp1 = PublishProcessor.create();
  PublishProcessor<String> pp2 = PublishProcessor.create();
  TestSubscriber<String> ts = new TestSubscriber<String>(0);
  Flowable.range(1, 3).withLatestFrom(new Flowable<?>[] { pp1, pp2 }, toArray)
  .subscribe(ts);
  ts.assertNoValues();
  pp1.onNext("1");
  pp2.onNext("1");
  ts.request(1);
  ts.assertValue("[1, 1, 1]");
  ts.request(1);
  ts.assertValues("[1, 1, 1]", "[2, 1, 1]");
  ts.request(1);
  ts.assertValues("[1, 1, 1]", "[2, 1, 1]", "[3, 1, 1]");
  ts.assertNoErrors();
  ts.assertComplete();
  assertFalse("ps1 has subscribers?", pp1.hasSubscribers());
  assertFalse("ps2 has subscribers?", pp2.hasSubscribers());
}

相关文章

Flowable类方法