本文整理了Java中rx.Observable.take()
方法的一些代码示例,展示了Observable.take()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.take()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:take
[英]Returns an Observable that emits only the first num items emitted by the source Observable.
This method returns an Observable that will invoke a subscribing Observer's Observer#onNext function a maximum of num times before invoking Observer#onCompleted. Scheduler: This version of take does not operate by default on a particular Scheduler.
[中]返回仅发射源可观测项发射的前num项的可观测项。
此方法返回一个Observable,在调用Observator#onCompleted之前,该Observable将调用订阅观察器的Observator#onNext函数最多num次。调度程序:默认情况下,此版本的take不会在特定调度程序上运行。
代码示例来源:origin: RichardWarburton/java-8-lambdas-exercises
public Observable<Artist> search(String searchedName,
String searchedNationality,
int maxResults) {
return getSavedArtists() // <1>
.filter(name -> name.contains(searchedName)) // <2>
.flatMap(this::lookupArtist) // <3>
.filter(artist -> artist.getNationality() // <4>
.contains(searchedNationality))
.take(maxResults); // <5>
}
// END search
代码示例来源:origin: PipelineAI/pipeline
@Test
public void noEvents() throws InterruptedException {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
//no writes
assertFalse(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void noEvents() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
commandStream.observe().take(1).subscribe(subscriber);
//no writes
assertFalse(latch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSemaphoreIsolatedSuccess() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSemaphoreIsolatedTimeout() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSemaphoreRejectedCommand() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SEMAPHORE_REJECTED);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadRejectedCommand() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.THREAD_POOL_REJECTED);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSemaphoreIsolatedFailure() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.FAILURE);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSemaphoreIsolatedBadRequest() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.BAD_REQUEST);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadIsolatedSuccess() throws InterruptedException {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testShortCircuit() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SHORT_CIRCUITED);
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadIsolatedFailure() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.FAILURE).setExecutedInThread();
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadIsolatedTimeout() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT).setExecutedInThread();
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testThreadIsolatedBadRequest() throws Exception {
CountDownLatch commandLatch = new CountDownLatch(1);
CountDownLatch threadPoolLatch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
readCommandStream.observe().take(1).subscribe(commandSubscriber);
Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.BAD_REQUEST).setExecutedInThread();
writeToStream.executionDone(result, commandKey, threadPoolKey);
assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSingleWriteMultipleSubscribers() throws InterruptedException {
CountDownLatch latch1 = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> subscriber1 = getLatchedSubscriber(latch1);
CountDownLatch latch2 = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> subscriber2 = getLatchedSubscriber(latch2);
commandStream.observe().take(1).subscribe(subscriber1);
commandStream.observe().take(1).subscribe(subscriber2);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
commandStream.write(event);
assertTrue(latch1.await(1000, TimeUnit.MILLISECONDS));
assertTrue(latch2.await(10, TimeUnit.MILLISECONDS));
}
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSingleWriteSingleSubscriber() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
commandStream.observe().take(1).subscribe(subscriber);
ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
commandStream.write(event);
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testEmptyStreamProducesZeros() {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Concurrency-A");
stream = RollingCommandMaxConcurrencyStream.getInstance(key, 10, 500);
stream.startCachingStreamValuesIfUnstarted();
final CountDownLatch latch = new CountDownLatch(1);
stream.observe().take(5).subscribe(getSubscriber(latch));
//no writes
try {
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
} catch (InterruptedException ex) {
fail("Interrupted ex");
}
assertEquals(0, stream.getLatestRollingMax());
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedException {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Concurrency-B");
stream = RollingCommandMaxConcurrencyStream.getInstance(key, 10, 500);
stream.startCachingStreamValuesIfUnstarted();
final CountDownLatch latch = new CountDownLatch(1);
stream.observe().take(5).subscribe(getSubscriber(latch));
Command cmd1 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 100);
Command cmd2 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 100);
cmd1.observe();
Thread.sleep(1);
cmd2.observe();
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(2, stream.getLatestRollingMax());
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedException {
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-B");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-B");
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-B");
stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
stream.startCachingStreamValuesIfUnstarted();
final CountDownLatch latch = new CountDownLatch(1);
stream.observe().take(10).subscribe(getSubscriber(latch));
Command cmd1 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 50);
Command cmd2 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 40);
cmd1.observe();
Thread.sleep(1);
cmd2.observe();
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(2, stream.getLatestRollingMax());
}
代码示例来源:origin: PipelineAI/pipeline
@Test
public void testSingleFailure() {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-C");
stream = HealthCountsStream.getInstance(key, 10, 100);
final CountDownLatch latch = new CountDownLatch(1);
stream.observe().take(10).subscribe(getSubscriber(latch));
CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);
cmd.observe();
try {
assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
} catch (InterruptedException ex) {
fail("Interrupted ex");
}
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertEquals(1L, stream.getLatest().getErrorCount());
assertEquals(1L, stream.getLatest().getTotalRequests());
}
内容来源于网络,如有侵权,请联系作者删除!