io.reactivex.Observable.scan()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(125)

本文整理了Java中io.reactivex.Observable.scan()方法的一些代码示例,展示了Observable.scan()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.scan()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:scan

Observable.scan介绍

[英]Returns an Observable that applies a specified accumulator function to the first item emitted by a source ObservableSource, then feeds the result of that function along with the second item emitted by the source ObservableSource into the same function, and so on until all items have been emitted by the source ObservableSource, emitting the result of each of these iterations.

This sort of function is sometimes called an accumulator. Scheduler: scan does not operate by default on a particular Scheduler.
[中]返回一个Observable,该函数将指定的累加器函数应用于源ObservableSource发出的第一项,然后将该函数的结果与源ObservableSource发出的第二项一起提供给同一个函数,依此类推,直到源ObservableSource发出所有项,发出每个迭代的结果。
这种函数有时被称为累加器。计划程序:默认情况下,扫描不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.scan(new BiFunction<Object, Object, Object>() {
      @Override
      public Object apply(Object a, Object b) throws Exception {
        return a;
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Object> o) throws Exception {
    return o.scan(0, new BiFunction<Object, Object, Object>() {
      @Override
      public Object apply(Object a, Object b) throws Exception {
        return a;
      }
    });
  }
}, false, 1, 1, 0, 0);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.scan(0, new BiFunction<Object, Object, Object>() {
      @Override
      public Object apply(Object a, Object b) throws Exception {
        return a;
      }
    });
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanSeedNull() {
  just1.scan(null, new BiFunction<Object, Integer, Object>() {
    @Override
    public Object apply(Object a, Integer b) {
      return 1;
    }
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanSeedFunctionNull() {
  just1.scan(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanFunctionNull() {
  just1.scan(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanFunctionReturnsNull() {
  Observable.just(1, 1).scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void scanSeedFunctionReturnsNull() {
  just1.scan(1, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanIntegersWithoutInitialValueAndOnlyOneValue() {
  Observer<Integer> observer = TestHelper.mockObserver();
  Observable<Integer> o = Observable.just(1);
  Observable<Integer> m = o.scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + t2;
    }
  });
  m.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, never()).onNext(0);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(anyInt());
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanIntegersWithoutInitialValue() {
  Observer<Integer> observer = TestHelper.mockObserver();
  Observable<Integer> o = Observable.just(1, 2, 3);
  Observable<Integer> m = o.scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + t2;
    }
  });
  m.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, never()).onNext(0);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(3);
  verify(observer, times(1)).onNext(6);
  verify(observer, times(3)).onNext(anyInt());
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanIntegersWithInitialValue() {
  Observer<String> observer = TestHelper.mockObserver();
  Observable<Integer> o = Observable.just(1, 2, 3);
  Observable<String> m = o.scan("", new BiFunction<String, Integer, String>() {
    @Override
    public String apply(String s, Integer n) {
      return s + n.toString();
    }
  });
  m.subscribe(observer);
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onNext("");
  verify(observer, times(1)).onNext("1");
  verify(observer, times(1)).onNext("12");
  verify(observer, times(1)).onNext("123");
  verify(observer, times(4)).onNext(anyString());
  verify(observer, times(1)).onComplete();
  verify(observer, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Object> apply(Observable<? extends Throwable> attempts) {
    // Worker w = Schedulers.computation().createWorker();
    return attempts
      .map(new Function<Throwable, Tuple>() {
        @Override
        public Tuple apply(Throwable n) {
          return new Tuple(new Long(1), n);
        }})
      .scan(new BiFunction<Tuple, Tuple, Tuple>() {
        @Override
        public Tuple apply(Tuple t, Tuple n) {
          return new Tuple(t.count + n.count, n.n);
        }})
      .flatMap(new Function<Tuple, Observable<Long>>() {
        @Override
        public Observable<Long> apply(Tuple t) {
          System.out.println("Retry # " + t.count);
          return t.count > 20 ?
            Observable.<Long>error(t.n) :
            Observable.timer(t.count * 1L, TimeUnit.MILLISECONDS);
      }}).cast(Object.class);
  }
}).subscribe(to);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shouldNotEmitUntilAfterSubscription() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 100).scan(0, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + t2;
    }
  }).filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer t1) {
      // this will cause request(1) when 0 is emitted
      return t1 > 0;
    }
  }).subscribe(to);
  assertEquals(100, to.values().size());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testUnsubscribeScan() throws Exception {

    ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
    .scan(new HashMap<String, String>(), new BiFunction<HashMap<String, String>, Event, HashMap<String, String>>() {
      @Override
      public HashMap<String, String> apply(HashMap<String, String> accum, Event perInstanceEvent) {
        accum.put("instance", perInstanceEvent.instanceId);
        return accum;
      }
    })
    .take(10)
    .blockingForEach(new Consumer<HashMap<String, String>>() {
      @Override
      public void accept(HashMap<String, String> pv) {
        System.out.println(pv);
      }
    });

    Thread.sleep(200); // make sure the event streams receive their interrupt
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanWithRequestOne() {
  Observable<Integer> o = Observable.just(1, 2).scan(0, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) {
      return t1 + t2;
    }
  }).take(1);
  TestObserver<Integer> observer = new TestObserver<Integer>();
  o.subscribe(observer);
  observer.assertValue(0);
  observer.assertTerminated();
  observer.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void testScanFunctionThrowsAndUpstreamEmitsOnNextResultsInScanFunctionBeingCalledOnlyOnce() {
    final RuntimeException err = new RuntimeException();
    final AtomicInteger count = new AtomicInteger();
    Observable.unsafeCreate(new ObservableSource<Integer>() {
      @Override
      public void subscribe(Observer<? super Integer> o) {
        Disposable d = Disposables.empty();
        o.onSubscribe(d);
        o.onNext(1);
        o.onNext(2);
        o.onNext(3);
      }})
    .scan(new BiFunction<Integer, Integer, Integer>() {
      @Override
      public Integer apply(Integer t1, Integer t2) throws Exception {
        count.incrementAndGet();
        throw err;
      }})
    .test()
    .assertError(err)
    .assertValue(1);
    assertEquals(1, count.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testScanFunctionThrowsAndUpstreamCompletesDoesNotResultInTwoTerminalEvents() {
  final RuntimeException err = new RuntimeException();
  Observable.unsafeCreate(new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> o) {
      Disposable d = Disposables.empty();
      o.onSubscribe(d);
      o.onNext(1);
      o.onNext(2);
      o.onComplete();
    }})
  .scan(new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(Integer t1, Integer t2) throws Exception {
      throw err;
    }})
  .test()
  .assertError(err)
  .assertValue(1);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void reduceWithObjects() {
  Observable<Movie> horrorMovies = Observable.<Movie> just(new HorrorMovie());
  Observable<Movie> reduceResult = horrorMovies.scan(new BiFunction<Movie, Movie, Movie>() {
    @Override
    public Movie apply(Movie t1, Movie t2) {
      return t2;
    }
  }).takeLast(1);
  Maybe<Movie> reduceResult2 = horrorMovies.reduce(new BiFunction<Movie, Movie, Movie>() {
    @Override
    public Movie apply(Movie t1, Movie t2) {
      return t2;
    }
  });
  assertNotNull(reduceResult2);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void error() {
  Observable.error(new TestException())
  .scan(new BiFunction<Object, Object, Object>() {
    @Override
    public Object apply(Object a, Object b) throws Exception {
      return a;
    }
  })
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unused")
@Test
public void reduceWithObjectsObservable() {
  Observable<Movie> horrorMovies = Observable.<Movie> just(new HorrorMovie());
  Observable<Movie> reduceResult = horrorMovies.scan(new BiFunction<Movie, Movie, Movie>() {
    @Override
    public Movie apply(Movie t1, Movie t2) {
      return t2;
    }
  }).takeLast(1);
  Observable<Movie> reduceResult2 = horrorMovies.reduce(new BiFunction<Movie, Movie, Movie>() {
    @Override
    public Movie apply(Movie t1, Movie t2) {
      return t2;
    }
  }).toObservable();
  assertNotNull(reduceResult2);
}

相关文章

Observable类方法