rx.Observable.range()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(140)

本文整理了Java中rx.Observable.range()方法的一些代码示例,展示了Observable.range()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.range()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:range

Observable.range介绍

[英]Returns an Observable that emits a sequence of Integers within a specified range.

Scheduler: range does not operate by default on a particular Scheduler.
[中]返回在指定范围内发出整数序列的可观测值。
调度程序:默认情况下,范围不会在特定调度程序上运行。

代码示例

代码示例来源:origin: konmik/nucleus

  1. private Observable<String> requestPage(int pageNumber, int pageSize) {
  2. return Observable.range(pageNumber * pageSize, pageSize).map(new Func1<Integer, String>() {
  3. @Override
  4. public String call(Integer integer) {
  5. return integer.toString();
  6. }
  7. });
  8. }

代码示例来源:origin: bumptech/glide

  1. Observable<List<Image>> getHotViralImages(@SuppressWarnings("SameParameterValue") int maxPages) {
  2. return Observable.range(0, maxPages)
  3. .flatMap(new Func1<Integer, Observable<List<Image>>>() {
  4. @Override

代码示例来源:origin: apache/usergrid

  1. @Test()
  2. public void testSequence3(){
  3. ArrayList listReturn = Observable.range(0, 2)
  4. .collect(()->new ArrayList(),(list,i) ->{
  5. list.add(i);
  6. }).toBlocking().first();
  7. Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
  8. }

代码示例来源:origin: apache/usergrid

  1. @Test( expected = TestException.class )
  2. public void throwOnBlockingLast() {
  3. Observable.range( 0, 1 ).map( integer -> {
  4. throw new TestException( "I throw and exception" );
  5. } ).toBlocking().last();
  6. }

代码示例来源:origin: apache/usergrid

  1. public Observable<Entity> createStreamFromWorkers( final SearchEdge indexEdge,
  2. final String uniqueIdentifier ) {
  3. //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread()
  4. return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
  5. integer -> createWriteObservable( indexEdge, uniqueIdentifier, integer )
  6. .subscribeOn( Schedulers.newThread() ) );
  7. }

代码示例来源:origin: apache/usergrid

  1. @Test( expected = TestException.class )
  2. public void throwOnBlockingFirst() {
  3. Observable.range( 0, 1 ).map( integer -> {
  4. throw new TestException( "I throw and exception" );
  5. } ).toBlocking().first();
  6. }

代码示例来源:origin: apache/usergrid

  1. @Test()
  2. public void testSequence(){
  3. ArrayList listReturn = Observable.range(0, 1).flatMap(i -> Observable.empty())
  4. .collect(()->new ArrayList(),(list,i) ->{
  5. list.add(i);
  6. }).toBlocking().lastOrDefault(null);
  7. Assert.assertEquals(listReturn,new ArrayList<Integer>());
  8. }

代码示例来源:origin: apache/usergrid

  1. @Test()
  2. public void testSequence2(){
  3. ArrayList listReturn = Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
  4. .collect(()->new ArrayList(),(list,i) ->{
  5. list.add(i);
  6. }).toBlocking().lastOrDefault(null);
  7. Assert.assertEquals(listReturn,new ArrayList<Integer>());
  8. }

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Tests that reduce emits
  3. */
  4. @Test
  5. public void testReduceEmpty(){
  6. final int result = Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
  7. assertEquals(0, result);
  8. }

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Tests working with observers
  3. */
  4. @Test( expected = TestException.class )
  5. public void throwOnSubscribeObservable() {
  6. final ReThrowObserver exceptionObserver = new ReThrowObserver();
  7. Observable.range( 0, 1 ).map( integer -> {
  8. throw new TestException( "I throw and exception" );
  9. } ).subscribe( exceptionObserver );
  10. exceptionObserver.checkResult();
  11. }

代码示例来源:origin: apache/usergrid

  1. @Test
  2. @Category(ExperimentalTest.class )
  3. public void testPublish() throws InterruptedException {
  4. final int count = 10;
  5. final CountDownLatch latch = new CountDownLatch( count+1 );
  6. final Subscription connectedObservable =
  7. Observable.range( 0, count )
  8. .doOnNext( integer -> latch.countDown() )
  9. .doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
  10. .subscribe();
  11. final boolean completed = latch.await( 3, TimeUnit.SECONDS );
  12. assertTrue( "publish1 behaves as expected", completed );
  13. final boolean completedSubscription = connectedObservable.isUnsubscribed();
  14. assertTrue( "Subscription complete", completedSubscription );
  15. }

代码示例来源:origin: ReactiveX/RxNetty

  1. @Test(timeout = 60000)
  2. public void testScheduleNow() throws Exception {
  3. RxJavaEventloopScheduler scheduler = new RxJavaEventloopScheduler(new NioEventLoopGroup());
  4. TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
  5. Observable.range(1, 1)
  6. .observeOn(scheduler)
  7. .subscribe(testSubscriber);
  8. testSubscriber.awaitTerminalEvent();
  9. testSubscriber.assertNoErrors();
  10. testSubscriber.assertValue(1);
  11. }

代码示例来源:origin: apache/usergrid

  1. private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
  2. final int edgeCount ) {
  3. final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
  4. //create our connection edge.
  5. final Id connectingId = createId( "connecting" );
  6. final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
  7. return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io() );
  8. }, 20).toList().toBlocking().last();
  9. assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
  10. return connectionSearchEdges;
  11. }

代码示例来源:origin: Netflix/conductor

  1. @Test
  2. public void test() {
  3. List<Message> messages = new LinkedList<>();
  4. Observable.range(0, 10).forEach((Integer x) -> messages.add(new Message("" + x, "payload: " + x, null)));
  5. assertEquals(10, messages.size());
  6. SQSObservableQueue queue = mock(SQSObservableQueue.class);
  7. when(queue.getOrCreateQueue()).thenReturn("junit_queue_url");
  8. Answer<?> answer = (Answer<List<Message>>) invocation -> Collections.emptyList();
  9. when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer);
  10. when(queue.getOnSubscribe()).thenCallRealMethod();
  11. when(queue.observe()).thenCallRealMethod();
  12. List<Message> found = new LinkedList<>();
  13. Observable<Message> observable = queue.observe();
  14. assertNotNull(observable);
  15. observable.subscribe(found::add);
  16. Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
  17. assertEquals(messages.size(), found.size());
  18. assertEquals(messages, found);
  19. }

代码示例来源:origin: apache/usergrid

  1. @Test
  2. @Category(ExperimentalTest.class )
  3. public void testConnectableObserver() throws InterruptedException {
  4. final int count = 10;
  5. final CountDownLatch latch = new CountDownLatch( count );
  6. final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
  7. //connect to our latch, which should run on it's own subscription
  8. //start our latch running
  9. connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
  10. final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
  11. //start the sequence
  12. connectedObservable.connect();
  13. final boolean completed = latch.await( 5, TimeUnit.SECONDS );
  14. assertTrue( "publish1 behaves as expected", completed );
  15. final int returnedCount = countObservable.toBlocking().last();
  16. assertEquals( "Counts the same", count, returnedCount );
  17. }

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Tests working with observers
  3. */
  4. @Test( expected = TestException.class )
  5. public void throwOnSubscribeObservableNewThread() throws Exception {
  6. final ReThrowObserver exceptionObserver = new ReThrowObserver();
  7. Observable.range( 0, 1 ).map(integer -> {
  8. throw new TestException("I throw and exception");
  9. })
  10. .doOnError(t -> exceptionObserver.onError(t))
  11. .subscribeOn(Schedulers.newThread())
  12. .subscribe(exceptionObserver);
  13. for(int i =0; i<5; i++) {
  14. exceptionObserver.checkResult();
  15. Thread.sleep(200);
  16. }
  17. }

代码示例来源:origin: konmik/nucleus

  1. .startWith(Observable.range(0, requestedPageCount))
  2. .concatMap(new Func1<Integer, Observable<String>>() {
  3. @Override

代码示例来源:origin: apache/usergrid

  1. final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
  2. final Id connectingId = createId( "connecting" );
  3. final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );

代码示例来源:origin: apache/usergrid

  1. final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
  2. final Id connectingId = createId("connecting");
  3. final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());

代码示例来源:origin: apache/usergrid

  1. return Observable.range( 0, indexTestFig.getNumberOfRecords() )

相关文章

Observable类方法