com.oath.cyclops.async.adapters.Queue.stream()方法的使用及代码示例

x33g5p2x  于2022-01-28 转载在 其他  
字(6.3k)|赞(0)|评价(0)|浏览(164)

本文整理了Java中com.oath.cyclops.async.adapters.Queue.stream方法的一些代码示例,展示了Queue.stream的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue.stream方法的具体详情如下:
包路径:com.oath.cyclops.async.adapters.Queue
类名称:Queue
方法名:stream

Queue.stream介绍

暂无

代码示例

代码示例来源:origin: aol/cyclops

@Override
public ReactiveSeq<T> stream(final Continueable s) {
  return connect(q -> q.stream(s));
}

代码示例来源:origin: aol/cyclops

/**
 * @return Infinite (until Queue is closed) Stream of CompletableFutures
 *         that can be used as input into a SimpleReact concurrent dataflow
 *
 *         This Stream itself is Sequential, SimpleReact will applyHKT
 *         concurrency / parralellism via the constituent CompletableFutures
 *
 */
@Override
public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
  return stream().map(CompletableFuture::completedFuture);
}

代码示例来源:origin: aol/cyclops

/**
 * Generating a stream will register the Stream as a reactiveSubscriber to this topic.
 * It will be provided with an internal Queue as a mailbox. @see Topic.disconnect to disconnect from the topic
 * @return Stream of data
 */
@Override
public ReactiveSeq<T> stream() {
  return connect(q -> q.stream());
}

代码示例来源:origin: aol/cyclops

private Stream<T> genJdkStream() {
  final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
  return queue.stream(subscription);
}

代码示例来源:origin: aol/cyclops

@Override
public Stream<T> unwrapStream() {
  if (async == Type.NO_BACKPRESSURE) {
    Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
                    .build();
    AtomicBoolean wip = new AtomicBoolean(false);
    Continuation cont = new Continuation(() -> {
      if (wip.compareAndSet(false, true)) {
        this.source.subscribeAll(queue::offer, i -> {
          queue.close();
        }, () -> queue.close());
      }
      return Continuation.empty();
    });
    queue.addContinuation(cont);
    return queue.stream();
  }
  return StreamSupport.stream(new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).spliterator(), false);
}

代码示例来源:origin: aol/cyclops

@Override
public Iterator<T> iterator() {
  if (async == Type.NO_BACKPRESSURE) {
    Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
        .build();
    AtomicBoolean wip = new AtomicBoolean(false);
    Subscription[] sub = {null};
    Continuation cont = new Continuation(() -> {
      if (wip.compareAndSet(false, true)) {
        this.source.subscribeAll(queue::offer,
            i -> queue.close(),
            () -> queue.close());
      }
      return Continuation.empty();
    });
    queue.addContinuation(cont);
    return queue.stream().iterator();
  }
  return new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).iterator();
}

代码示例来源:origin: aol/cyclops

@Test
public void publishTest() {
  for (int k = 0; k < ITERATIONS; k++) {
    Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
        .build();
    Thread t = new Thread(() -> {
      try {
        System.out.println("Sleeping!");
        Thread.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println("Waking!");
      System.out.println("Closing! " + queue.size());
      queue.close();
    });
    t.start();
    of(1, 2, 3).peek(i -> System.out.println("publishing " + i))
        .publishTo(queue)
        .forEach(System.out::println);
    assertThat(queue.stream().collect(Collectors.toList()), equalTo(Arrays.asList(1, 2, 3)));
    t = null;
    System.gc();
  }
}

代码示例来源:origin: aol/cyclops

return queue.stream();

代码示例来源:origin: aol/cyclops

return queue.stream();

代码示例来源:origin: aol/cyclops

@Test
public void queueTest(){
  com.oath.cyclops.async.adapters.Queue<Integer> q = new Queue<>();
  q.add(1);
  q.add(2);
  q.add(3);
  q.stream().limit(3).forEach(System.out::println);
  q.add(4);
  q.add(5);
  q.stream().limit(2).forEach(System.out::println);
}
@Test

代码示例来源:origin: com.oath.cyclops/cyclops

/**
 * @return Infinite (until Queue is closed) Stream of CompletableFutures
 *         that can be used as input into a SimpleReact concurrent dataflow
 *
 *         This Stream itself is Sequential, SimpleReact will applyHKT
 *         concurrency / parralellism via the constituent CompletableFutures
 *
 */
@Override
public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
  return stream().map(CompletableFuture::completedFuture);
}

代码示例来源:origin: com.oath.cyclops/cyclops

/**
 * Generating a stream will register the Stream as a reactiveSubscriber to this topic.
 * It will be provided with an internal Queue as a mailbox. @see Topic.disconnect to disconnect from the topic
 * @return Stream of data
 */
@Override
public ReactiveSeq<T> stream() {
  return connect(q -> q.stream());
}

代码示例来源:origin: com.oath.cyclops/cyclops

@Override
public ReactiveSeq<T> stream(final Continueable s) {
  return connect(q -> q.stream(s));
}

代码示例来源:origin: com.oath.cyclops/cyclops

private Stream<T> genJdkStream() {
  final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
  return queue.stream(subscription);
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

private Stream<T> genJdkStream() {
  final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
  return queue.stream(subscription);
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

/**
 * Wrap a Stream into a SimpleReactStream.
 */
static <T> SimpleReactStream<T> simpleReactStream(Stream<T> stream) {
  if (stream instanceof FutureStream)
    stream = ((FutureStream) stream).toQueue()
                      .stream(((FutureStream) stream).getSubscription());
  final SimpleReact sr = new SimpleReact(
                      ThreadPools.getCurrentThreadExecutor(),
                      false);
  return new SimpleReactStreamImpl<T>(
                    sr, stream.map(CompletableFuture::completedFuture));
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

private FutureStream<T> genStream() {
  final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
  return new LazyReact().of()
      .withSubscription(subscription)
      .fromStream(queue.stream(subscription));
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

default Iterator<U> iterator() {
  final Queue<U> q = toQueue();
  if (getSubscription().closed())
    return new CloseableIterator<>(
                    Arrays.<U> asList()
                       .iterator(),
                    getSubscription(), null);
  return new CloseableIterator<>(
                  q.stream(getSubscription())
                  .iterator(),
                  getSubscription(), q);
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default FutureStream<U> limit(final long maxSize) {
  final Continueable sub = this.getSubscription();
  sub.registerLimit(maxSize);
  return fromStream(ReactiveSeq.oneShotStream(toQueue().stream(sub))
                 .limit(maxSize));
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default FutureStream<U> skip(final long n) {
  final Continueable sub = this.getSubscription();
  sub.registerSkip(n);
  return fromStream(ReactiveSeq.oneShotStream(toQueue().stream(sub))
                 .skip(n));
}

相关文章