本文整理了Java中rx.Observable.range()
方法的一些代码示例,展示了Observable.range()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.range()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:range
[英]Returns an Observable that emits a sequence of Integers within a specified range.
Scheduler: range does not operate by default on a particular Scheduler.
[中]返回在指定范围内发出整数序列的可观测值。
调度程序:默认情况下,范围不会在特定调度程序上运行。
代码示例来源:origin: konmik/nucleus
private Observable<String> requestPage(int pageNumber, int pageSize) {
return Observable.range(pageNumber * pageSize, pageSize).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer.toString();
}
});
}
代码示例来源:origin: bumptech/glide
Observable<List<Image>> getHotViralImages(@SuppressWarnings("SameParameterValue") int maxPages) {
return Observable.range(0, maxPages)
.flatMap(new Func1<Integer, Observable<List<Image>>>() {
@Override
代码示例来源:origin: apache/usergrid
@Test()
public void testSequence3(){
ArrayList listReturn = Observable.range(0, 2)
.collect(()->new ArrayList(),(list,i) ->{
list.add(i);
}).toBlocking().first();
Assert.assertEquals(listReturn, Observable.range(0, 2).toList().toBlocking().last());
}
代码示例来源:origin: apache/usergrid
@Test( expected = TestException.class )
public void throwOnBlockingLast() {
Observable.range( 0, 1 ).map( integer -> {
throw new TestException( "I throw and exception" );
} ).toBlocking().last();
}
代码示例来源:origin: apache/usergrid
public Observable<Entity> createStreamFromWorkers( final SearchEdge indexEdge,
final String uniqueIdentifier ) {
//create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread()
return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
integer -> createWriteObservable( indexEdge, uniqueIdentifier, integer )
.subscribeOn( Schedulers.newThread() ) );
}
代码示例来源:origin: apache/usergrid
@Test( expected = TestException.class )
public void throwOnBlockingFirst() {
Observable.range( 0, 1 ).map( integer -> {
throw new TestException( "I throw and exception" );
} ).toBlocking().first();
}
代码示例来源:origin: apache/usergrid
@Test()
public void testSequence(){
ArrayList listReturn = Observable.range(0, 1).flatMap(i -> Observable.empty())
.collect(()->new ArrayList(),(list,i) ->{
list.add(i);
}).toBlocking().lastOrDefault(null);
Assert.assertEquals(listReturn,new ArrayList<Integer>());
}
代码示例来源:origin: apache/usergrid
@Test()
public void testSequence2(){
ArrayList listReturn = Observable.range(0, 2).buffer(2).flatMap(i -> Observable.empty())
.collect(()->new ArrayList(),(list,i) ->{
list.add(i);
}).toBlocking().lastOrDefault(null);
Assert.assertEquals(listReturn,new ArrayList<Integer>());
}
代码示例来源:origin: apache/usergrid
/**
* Tests that reduce emits
*/
@Test
public void testReduceEmpty(){
final int result = Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
assertEquals(0, result);
}
代码示例来源:origin: apache/usergrid
/**
* Tests working with observers
*/
@Test( expected = TestException.class )
public void throwOnSubscribeObservable() {
final ReThrowObserver exceptionObserver = new ReThrowObserver();
Observable.range( 0, 1 ).map( integer -> {
throw new TestException( "I throw and exception" );
} ).subscribe( exceptionObserver );
exceptionObserver.checkResult();
}
代码示例来源:origin: apache/usergrid
@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
final int count = 10;
final CountDownLatch latch = new CountDownLatch( count+1 );
final Subscription connectedObservable =
Observable.range( 0, count )
.doOnNext( integer -> latch.countDown() )
.doOnCompleted( () -> latch.countDown() ).subscribeOn( Schedulers.io() )
.subscribe();
final boolean completed = latch.await( 3, TimeUnit.SECONDS );
assertTrue( "publish1 behaves as expected", completed );
final boolean completedSubscription = connectedObservable.isUnsubscribed();
assertTrue( "Subscription complete", completedSubscription );
}
代码示例来源:origin: ReactiveX/RxNetty
@Test(timeout = 60000)
public void testScheduleNow() throws Exception {
RxJavaEventloopScheduler scheduler = new RxJavaEventloopScheduler(new NioEventLoopGroup());
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
Observable.range(1, 1)
.observeOn(scheduler)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertNoErrors();
testSubscriber.assertValue(1);
}
代码示例来源:origin: apache/usergrid
private List<MarkedEdge> createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager,
final int edgeCount ) {
final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
//create our connection edge.
final Id connectingId = createId( "connecting" );
final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io() );
}, 20).toList().toBlocking().last();
assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
return connectionSearchEdges;
}
代码示例来源:origin: Netflix/conductor
@Test
public void test() {
List<Message> messages = new LinkedList<>();
Observable.range(0, 10).forEach((Integer x) -> messages.add(new Message("" + x, "payload: " + x, null)));
assertEquals(10, messages.size());
SQSObservableQueue queue = mock(SQSObservableQueue.class);
when(queue.getOrCreateQueue()).thenReturn("junit_queue_url");
Answer<?> answer = (Answer<List<Message>>) invocation -> Collections.emptyList();
when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer);
when(queue.getOnSubscribe()).thenCallRealMethod();
when(queue.observe()).thenCallRealMethod();
List<Message> found = new LinkedList<>();
Observable<Message> observable = queue.observe();
assertNotNull(observable);
observable.subscribe(found::add);
Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
assertEquals(messages.size(), found.size());
assertEquals(messages, found);
}
代码示例来源:origin: apache/usergrid
@Test
@Category(ExperimentalTest.class )
public void testConnectableObserver() throws InterruptedException {
final int count = 10;
final CountDownLatch latch = new CountDownLatch( count );
final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
//connect to our latch, which should run on it's own subscription
//start our latch running
connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
//start the sequence
connectedObservable.connect();
final boolean completed = latch.await( 5, TimeUnit.SECONDS );
assertTrue( "publish1 behaves as expected", completed );
final int returnedCount = countObservable.toBlocking().last();
assertEquals( "Counts the same", count, returnedCount );
}
代码示例来源:origin: apache/usergrid
/**
* Tests working with observers
*/
@Test( expected = TestException.class )
public void throwOnSubscribeObservableNewThread() throws Exception {
final ReThrowObserver exceptionObserver = new ReThrowObserver();
Observable.range( 0, 1 ).map(integer -> {
throw new TestException("I throw and exception");
})
.doOnError(t -> exceptionObserver.onError(t))
.subscribeOn(Schedulers.newThread())
.subscribe(exceptionObserver);
for(int i =0; i<5; i++) {
exceptionObserver.checkResult();
Thread.sleep(200);
}
}
代码示例来源:origin: konmik/nucleus
.startWith(Observable.range(0, requestedPageCount))
.concatMap(new Func1<Integer, Observable<String>>() {
@Override
代码示例来源:origin: apache/usergrid
final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> {
final Id connectingId = createId( "connecting" );
final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() );
代码示例来源:origin: apache/usergrid
final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
final Id connectingId = createId("connecting");
final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
代码示例来源:origin: apache/usergrid
return Observable.range( 0, indexTestFig.getNumberOfRecords() )
内容来源于网络,如有侵权,请联系作者删除!