rx.Observable.take()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.0k)|赞(0)|评价(0)|浏览(372)

本文整理了Java中rx.Observable.take()方法的一些代码示例,展示了Observable.take()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.take()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:take

Observable.take介绍

[英]Returns an Observable that emits only the first num items emitted by the source Observable.

This method returns an Observable that will invoke a subscribing Observer's Observer#onNext function a maximum of num times before invoking Observer#onCompleted. Scheduler: This version of take does not operate by default on a particular Scheduler.
[中]返回仅发射源可观测项发射的前num项的可观测项。
此方法返回一个Observable,在调用Observator#onCompleted之前,该Observable将调用订阅观察器的Observator#onNext函数最多num次。调度程序:默认情况下,此版本的take不会在特定调度程序上运行。

代码示例

代码示例来源:origin: RichardWarburton/java-8-lambdas-exercises

  1. public Observable<Artist> search(String searchedName,
  2. String searchedNationality,
  3. int maxResults) {
  4. return getSavedArtists() // <1>
  5. .filter(name -> name.contains(searchedName)) // <2>
  6. .flatMap(this::lookupArtist) // <3>
  7. .filter(artist -> artist.getNationality() // <4>
  8. .contains(searchedNationality))
  9. .take(maxResults); // <5>
  10. }
  11. // END search

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void noEvents() throws InterruptedException {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. //no writes
  10. assertFalse(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  11. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  12. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void noEvents() throws InterruptedException {
  3. CountDownLatch latch = new CountDownLatch(1);
  4. Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
  5. commandStream.observe().take(1).subscribe(subscriber);
  6. //no writes
  7. assertFalse(latch.await(1000, TimeUnit.MILLISECONDS));
  8. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSemaphoreIsolatedSuccess() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSemaphoreIsolatedTimeout() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSemaphoreRejectedCommand() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.SEMAPHORE_REJECTED);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadRejectedCommand() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.THREAD_POOL_REJECTED);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSemaphoreIsolatedFailure() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.FAILURE);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSemaphoreIsolatedBadRequest() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.BAD_REQUEST);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadIsolatedSuccess() throws InterruptedException {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testShortCircuit() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.SHORT_CIRCUITED);
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }
  14. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadIsolatedFailure() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.FAILURE).setExecutedInThread();
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadIsolatedTimeout() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT).setExecutedInThread();
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testThreadIsolatedBadRequest() throws Exception {
  3. CountDownLatch commandLatch = new CountDownLatch(1);
  4. CountDownLatch threadPoolLatch = new CountDownLatch(1);
  5. Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  6. readCommandStream.observe().take(1).subscribe(commandSubscriber);
  7. Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  8. readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.BAD_REQUEST).setExecutedInThread();
  10. writeToStream.executionDone(result, commandKey, threadPoolKey);
  11. assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  12. assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  13. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSingleWriteMultipleSubscribers() throws InterruptedException {
  3. CountDownLatch latch1 = new CountDownLatch(1);
  4. Subscriber<HystrixCommandCompletion> subscriber1 = getLatchedSubscriber(latch1);
  5. CountDownLatch latch2 = new CountDownLatch(1);
  6. Subscriber<HystrixCommandCompletion> subscriber2 = getLatchedSubscriber(latch2);
  7. commandStream.observe().take(1).subscribe(subscriber1);
  8. commandStream.observe().take(1).subscribe(subscriber2);
  9. ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  10. HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
  11. commandStream.write(event);
  12. assertTrue(latch1.await(1000, TimeUnit.MILLISECONDS));
  13. assertTrue(latch2.await(10, TimeUnit.MILLISECONDS));
  14. }
  15. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSingleWriteSingleSubscriber() throws InterruptedException {
  3. CountDownLatch latch = new CountDownLatch(1);
  4. Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
  5. commandStream.observe().take(1).subscribe(subscriber);
  6. ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  7. HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
  8. commandStream.write(event);
  9. assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
  10. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testEmptyStreamProducesZeros() {
  3. HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Concurrency-A");
  4. stream = RollingCommandMaxConcurrencyStream.getInstance(key, 10, 500);
  5. stream.startCachingStreamValuesIfUnstarted();
  6. final CountDownLatch latch = new CountDownLatch(1);
  7. stream.observe().take(5).subscribe(getSubscriber(latch));
  8. //no writes
  9. try {
  10. assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  11. } catch (InterruptedException ex) {
  12. fail("Interrupted ex");
  13. }
  14. assertEquals(0, stream.getLatestRollingMax());
  15. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedException {
  3. HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Concurrency-B");
  4. stream = RollingCommandMaxConcurrencyStream.getInstance(key, 10, 500);
  5. stream.startCachingStreamValuesIfUnstarted();
  6. final CountDownLatch latch = new CountDownLatch(1);
  7. stream.observe().take(5).subscribe(getSubscriber(latch));
  8. Command cmd1 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 100);
  9. Command cmd2 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 100);
  10. cmd1.observe();
  11. Thread.sleep(1);
  12. cmd2.observe();
  13. assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  14. assertEquals(2, stream.getLatestRollingMax());
  15. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedException {
  3. HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-B");
  4. HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-B");
  5. HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-B");
  6. stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
  7. stream.startCachingStreamValuesIfUnstarted();
  8. final CountDownLatch latch = new CountDownLatch(1);
  9. stream.observe().take(10).subscribe(getSubscriber(latch));
  10. Command cmd1 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 50);
  11. Command cmd2 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 40);
  12. cmd1.observe();
  13. Thread.sleep(1);
  14. cmd2.observe();
  15. assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  16. assertEquals(2, stream.getLatestRollingMax());
  17. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Test
  2. public void testSingleFailure() {
  3. HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-C");
  4. stream = HealthCountsStream.getInstance(key, 10, 100);
  5. final CountDownLatch latch = new CountDownLatch(1);
  6. stream.observe().take(10).subscribe(getSubscriber(latch));
  7. CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);
  8. cmd.observe();
  9. try {
  10. assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  11. } catch (InterruptedException ex) {
  12. fail("Interrupted ex");
  13. }
  14. System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
  15. assertEquals(1L, stream.getLatest().getErrorCount());
  16. assertEquals(1L, stream.getLatest().getTotalRequests());
  17. }

相关文章

Observable类方法