io.reactivex.Flowable.scan()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(6.2k)|赞(0)|评价(0)|浏览(161)

本文整理了Java中io.reactivex.Flowable.scan()方法的一些代码示例,展示了Flowable.scan()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.scan()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:scan

Flowable.scan介绍

[英]Returns a Flowable that applies a specified accumulator function to the first item emitted by a source Publisher, then feeds the result of that function along with the second item emitted by the source Publisher into the same function, and so on until all items have been emitted by the source Publisher, emitting the result of each of these iterations.

This sort of function is sometimes called an accumulator. Backpressure: The operator honors downstream backpressure and expects the source Publisher to honor backpressure as well. Violating this expectation, a MissingBackpressureException may get signaled somewhere downstream. Scheduler: scan does not operate by default on a particular Scheduler.
[中]返回一个可流动函数,该函数将指定的累加器函数应用于源发布服务器发出的第一个项,然后将该函数的结果与源发布服务器发出的第二个项一起提供给同一个函数,依此类推,直到源发布服务器发出所有项,发出每个迭代的结果。
这种函数有时称为累加器。背压:操作员接受下游背压,并希望源发布者也接受背压。违反此预期,可能会在下游某处发出缺少背压异常*的信号。计划程序:默认情况下,扫描不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> f) throws Exception {
    return f.scan(new BiFunction<Object, Object, Object>() {
      @Override
      public Object apply(Object a, Object b) throws Exception {
        return a;
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<Object> apply(Flowable<Object> f) throws Exception {
    return f.scan(0, new BiFunction<Object, Object, Object>() {
      @Override
      public Object apply(Object a, Object b) throws Exception {
        return a;
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanSeedFunctionNull() {
  just1.scan(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanSeedNull() {
  just1.scan(null, new BiFunction<Object, Integer, Object>() {
    @Override
    public Object apply(Object a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).scan(new BiFunction<Integer, Integer, Integer>() {
          @Override
          public Integer apply(Integer a, Integer b) throws Exception {
            return a + b;
          }
        })
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanFunctionNull() {
  just1.scan(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanSeedFunctionReturnsNull() {
  just1.scan(1, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanFunctionReturnsNull() {
  Flowable.just(1, 1).scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scanEmptyBackpressured() {
  Flowable.<Integer>empty()
  .scan(0, SUM)
  .test(1)
  .assertResult(0);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanWithSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
  final RuntimeException e = new RuntimeException();
  final AtomicInteger count = new AtomicInteger();
  Burst.items(1, 2).create().scan(0, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer n1, Integer n2) throws Exception {
      count.incrementAndGet();
      throw e;
    }})
   .test()
   .assertValues(0)
   .assertError(e);
  assertEquals(1, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanNoSeedDoesNotProcessOnNextAfterTerminalEventIfScanFunctionThrows() {
  final RuntimeException e = new RuntimeException();
  final AtomicInteger count = new AtomicInteger();
  Burst.items(1, 2, 3).create().scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer n1, Integer n2) throws Exception {
      count.incrementAndGet();
      throw e;
    }})
   .test()
   .assertValue(1)
   .assertError(e);
  assertEquals(1, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void scanErrorBackpressured() {
  Flowable.<Integer>error(new TestException())
  .scan(0, SUM)
  .test(0)
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Flowable.error(new TestException())
  .scan(new BiFunction<Object, Object, Object>() {
    @Override
    public Object apply(Object a, Object b) throws Exception {
      return a;
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanWithSeedCompletesNormally() {
  Flowable.just(1, 2, 3).scan(0, SUM)
   .test()
   .assertValues(0, 1, 3, 6)
   .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanWithSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
  final RuntimeException e = new RuntimeException();
  Burst.item(1).create()
   .scan(0, throwingBiFunction(e))
   .test()
   .assertValue(0)
   .assertError(e);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanNoSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
  final RuntimeException e = new RuntimeException();
  Burst.items(1, 2).create()
   .scan(throwingBiFunction(e))
   .test()
   .assertValue(1)
   .assertError(e);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanNoSeed() {
  Flowable.just(1, 2, 3)
    .scan(SUM)
    .test()
    .assertValues(1, 3, 6)
    .assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void neverSource() {
  Flowable.<Integer>never()
  .scan(0, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) throws Exception {
      return a + b;
    }
  })
  .test()
  .assertValue(0)
  .assertNoErrors()
  .assertNotComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanWithRequestOne() {
  Flowable<Integer> f = Flowable.just(1, 2).scan(0, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + t2;
    }
  }).take(1);
  TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
  f.subscribe(subscriber);
  subscriber.assertValue(0);
  subscriber.assertTerminated();
  subscriber.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@Ignore("scanSeed no longer emits without upstream signal")
public void testInitialValueEmittedWithProducer() {
  Flowable<Integer> source = Flowable.never();
  TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
  source.scan(0, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + t2;
    }
  }).subscribe(ts);
  ts.assertNoErrors();
  ts.assertNotComplete();
  ts.assertValue(0);
}

相关文章

Flowable类方法