com.oath.cyclops.async.adapters.Queue.jdkStream()方法的使用及代码示例

x33g5p2x  于2022-01-28 转载在 其他  
字(4.7k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中com.oath.cyclops.async.adapters.Queue.jdkStream方法的一些代码示例,展示了Queue.jdkStream的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue.jdkStream方法的具体详情如下:
包路径:com.oath.cyclops.async.adapters.Queue
类名称:Queue
方法名:jdkStream

Queue.jdkStream介绍

[英]Return a standard (unextended) JDK Stream connected to this Queue To disconnect cleanly close the queue

use queue.stream().parallel() to convert to a parallel Stream

[中]返回连接到此队列的标准(未扩展)JDK流,以断开连接并完全关闭队列

use queue.stream().parallel() to convert to a parallel Stream

代码示例

代码示例来源:origin: aol/cyclops

/**
 * Return a standard (unextended) JDK Stream connected to this Queue
 * To disconnect cleanly close the queue
 *
 * <pre>
 * {@code
 *        use queue.stream().parallel() to convert to a parallel Stream
 *  }
 * </pre>
 * @see Queue#jdkStream(int) for an alternative that sends more poision pills for use with parallel Streams.
 *
 * @return Java 8 Stream connnected to this Queue
 */
public Stream<T> jdkStream() {
  return jdkStream(2);
}

代码示例来源:origin: aol/cyclops

@Override
public Iterator<T> iterator() {
  return host.jdkStream().iterator();
}

代码示例来源:origin: aol/cyclops

default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
  AtomicReference<Continuation> ref = new AtomicReference<>(null);
  Continuation cont =
      new Continuation(()->{
        if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            this.spliterator().forEachRemaining(queue::offer);
          }finally {
            queue.close();
          }
        }
          return Continuation.empty();
        });
  ;
  queue.addContinuation(cont);
  return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){

代码示例来源:origin: com.oath.cyclops/cyclops

/**
 * Return a standard (unextended) JDK Stream connected to this Queue
 * To disconnect cleanly close the queue
 *
 * <pre>
 * {@code
 *        use queue.stream().parallel() to convert to a parallel Stream
 *  }
 * </pre>
 * @see Queue#jdkStream(int) for an alternative that sends more poision pills for use with parallel Streams.
 *
 * @return Java 8 Stream connnected to this Queue
 */
public Stream<T> jdkStream() {
  return jdkStream(2);
}

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
public Iterator<T> iterator() {
  return host.jdkStream().iterator();
}

代码示例来源:origin: Nextdoor/bender

Stream<InternalEvent> forkInput = queue.jdkStream();
 for (OperationProcessor opProcInFork : opProcsInFork) {
  forkInput = opProcInFork.perform(forkInput);
return outputQueue.jdkStream();

代码示例来源:origin: Nextdoor/bender

Stream<InternalEvent> forkInput = queue.jdkStream();
 for (OperationProcessor opProcInFork : opProcsInFork) {
  forkInput = opProcInFork.perform(forkInput);
return outputQueue.jdkStream();

代码示例来源:origin: Nextdoor/bender

Stream<InternalEvent> conditionInput = queue.jdkStream();
 for (OperationProcessor proc : procs) {
  conditionInput = proc.perform(conditionInput);
return outputQueue.jdkStream();

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

/**
 * Create a pushable JDK 8 Stream
 *
 * <pre>
 * {@code
 * PushableStream<Integer> pushable = StreamSource.ofUnbounded()
                          .stream();
  pushable.getInput()
      .add(10);
  pushable.getInput()
      .close();
  pushable.getStream().collect(CyclopsCollectors.toList()) //[10]
 *
 * }
 * </pre>
 *
 * @return PushableStream that can accept data to push into a Java 8 Stream
 * to push it to the Stream
 */
public <T> PushableStream<T> stream() {
  final Queue<T> q = createQueue();
  return new PushableStream<T>(
                 q, q.jdkStream());
}

代码示例来源:origin: Nextdoor/bender

Stream<InternalEvent> conditionInput = queue.jdkStream();
 for (OperationProcessor proc : procs) {
  conditionInput = proc.perform(conditionInput);
return outputQueue.jdkStream();

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default ReactiveSeq<U> stream() {
   return Streams.oneShotStream(toQueue().jdkStream(getSubscription()));
}

代码示例来源:origin: com.oath.cyclops/cyclops

default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
  AtomicReference<Continuation> ref = new AtomicReference<>(null);
  Continuation cont =
      new Continuation(()->{
        if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            this.spliterator().forEachRemaining(queue::offer);
          }finally {
            queue.close();
          }
        }
          return Continuation.empty();
        });
  ;
  queue.addContinuation(cont);
  return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){

代码示例来源:origin: Nextdoor/bender

Stream<InternalEvent> input = this.eventQueue.jdkStream();

代码示例来源:origin: Nextdoor/bender

Stream<InternalEvent> input = this.eventQueue.jdkStream();

相关文章