rx.Observable.take()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(15.0k)|赞(0)|评价(0)|浏览(342)

本文整理了Java中rx.Observable.take()方法的一些代码示例,展示了Observable.take()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.take()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:take

Observable.take介绍

[英]Returns an Observable that emits only the first num items emitted by the source Observable.

This method returns an Observable that will invoke a subscribing Observer's Observer#onNext function a maximum of num times before invoking Observer#onCompleted. Scheduler: This version of take does not operate by default on a particular Scheduler.
[中]返回仅发射源可观测项发射的前num项的可观测项。
此方法返回一个Observable,在调用Observator#onCompleted之前,该Observable将调用订阅观察器的Observator#onNext函数最多num次。调度程序:默认情况下,此版本的take不会在特定调度程序上运行。

代码示例

代码示例来源:origin: RichardWarburton/java-8-lambdas-exercises

public Observable<Artist> search(String searchedName,
                 String searchedNationality,
                 int maxResults) {

  return getSavedArtists()  // <1>
     .filter(name -> name.contains(searchedName)) // <2>
     .flatMap(this::lookupArtist) // <3>
     .filter(artist -> artist.getNationality() // <4>
                 .contains(searchedNationality))
     .take(maxResults); // <5>
}
  // END search

代码示例来源:origin: PipelineAI/pipeline

@Test
public void noEvents() throws InterruptedException {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  //no writes
  assertFalse(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void noEvents() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
  commandStream.observe().take(1).subscribe(subscriber);
  //no writes
  assertFalse(latch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedSuccess() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedTimeout() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreRejectedCommand() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SEMAPHORE_REJECTED);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadRejectedCommand() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.THREAD_POOL_REJECTED);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedFailure() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.FAILURE);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSemaphoreIsolatedBadRequest() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.BAD_REQUEST);
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedSuccess() throws InterruptedException {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
  public void testShortCircuit() throws Exception {
    CountDownLatch commandLatch = new CountDownLatch(1);
    CountDownLatch threadPoolLatch = new CountDownLatch(1);

    Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
    readCommandStream.observe().take(1).subscribe(commandSubscriber);

    Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
    readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);

    ExecutionResult result = ExecutionResult.from(HystrixEventType.SHORT_CIRCUITED);
    writeToStream.executionDone(result, commandKey, threadPoolKey);

    assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
    assertFalse(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
  }
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedFailure() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.FAILURE).setExecutedInThread();
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedTimeout() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.TIMEOUT).setExecutedInThread();
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testThreadIsolatedBadRequest() throws Exception {
  CountDownLatch commandLatch = new CountDownLatch(1);
  CountDownLatch threadPoolLatch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> commandSubscriber = getLatchedSubscriber(commandLatch);
  readCommandStream.observe().take(1).subscribe(commandSubscriber);
  Subscriber<HystrixCommandCompletion> threadPoolSubscriber = getLatchedSubscriber(threadPoolLatch);
  readThreadPoolStream.observe().take(1).subscribe(threadPoolSubscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.BAD_REQUEST).setExecutedInThread();
  writeToStream.executionDone(result, commandKey, threadPoolKey);
  assertTrue(commandLatch.await(1000, TimeUnit.MILLISECONDS));
  assertTrue(threadPoolLatch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
  public void testSingleWriteMultipleSubscribers() throws InterruptedException {
    CountDownLatch latch1 = new CountDownLatch(1);
    Subscriber<HystrixCommandCompletion> subscriber1 = getLatchedSubscriber(latch1);

    CountDownLatch latch2 = new CountDownLatch(1);
    Subscriber<HystrixCommandCompletion> subscriber2 = getLatchedSubscriber(latch2);

    commandStream.observe().take(1).subscribe(subscriber1);
    commandStream.observe().take(1).subscribe(subscriber2);

    ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
    HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
    commandStream.write(event);

    assertTrue(latch1.await(1000, TimeUnit.MILLISECONDS));
    assertTrue(latch2.await(10, TimeUnit.MILLISECONDS));
  }
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSingleWriteSingleSubscriber() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  Subscriber<HystrixCommandCompletion> subscriber = getLatchedSubscriber(latch);
  commandStream.observe().take(1).subscribe(subscriber);
  ExecutionResult result = ExecutionResult.from(HystrixEventType.SUCCESS).setExecutedInThread();
  HystrixCommandCompletion event = HystrixCommandCompletion.from(result, commandKey, threadPoolKey);
  commandStream.write(event);
  assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testEmptyStreamProducesZeros() {
  HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Concurrency-A");
  stream = RollingCommandMaxConcurrencyStream.getInstance(key, 10, 500);
  stream.startCachingStreamValuesIfUnstarted();
  final CountDownLatch latch = new CountDownLatch(1);
  stream.observe().take(5).subscribe(getSubscriber(latch));
  //no writes
  try {
    assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  } catch (InterruptedException ex) {
    fail("Interrupted ex");
  }
  assertEquals(0, stream.getLatestRollingMax());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedException {
  HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Concurrency-B");
  stream = RollingCommandMaxConcurrencyStream.getInstance(key, 10, 500);
  stream.startCachingStreamValuesIfUnstarted();
  final CountDownLatch latch = new CountDownLatch(1);
  stream.observe().take(5).subscribe(getSubscriber(latch));
  Command cmd1 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 100);
  Command cmd2 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 100);
  cmd1.observe();
  Thread.sleep(1);
  cmd2.observe();
  assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  assertEquals(2, stream.getLatestRollingMax());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testStartsAndEndsInSameBucketProduceValue() throws InterruptedException {
  HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ThreadPool-Concurrency-B");
  HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("ThreadPool-Concurrency-B");
  HystrixCommandKey key = HystrixCommandKey.Factory.asKey("RollingConcurrency-B");
  stream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, 10, 100);
  stream.startCachingStreamValuesIfUnstarted();
  final CountDownLatch latch = new CountDownLatch(1);
  stream.observe().take(10).subscribe(getSubscriber(latch));
  Command cmd1 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 50);
  Command cmd2 = Command.from(groupKey, key, HystrixEventType.SUCCESS, 40);
  cmd1.observe();
  Thread.sleep(1);
  cmd2.observe();
  assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  assertEquals(2, stream.getLatestRollingMax());
}

代码示例来源:origin: PipelineAI/pipeline

@Test
public void testSingleFailure() {
  HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-C");
  stream = HealthCountsStream.getInstance(key, 10, 100);
  final CountDownLatch latch = new CountDownLatch(1);
  stream.observe().take(10).subscribe(getSubscriber(latch));
  CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);
  cmd.observe();
  try {
    assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
  } catch (InterruptedException ex) {
    fail("Interrupted ex");
  }
  System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
  assertEquals(1L, stream.getLatest().getErrorCount());
  assertEquals(1L, stream.getLatest().getTotalRequests());
}

相关文章

Observable类方法