com.oath.cyclops.async.adapters.Queue.addContinuation()方法的使用及代码示例

x33g5p2x  于2022-01-28 转载在 其他  
字(6.6k)|赞(0)|评价(0)|浏览(170)

本文整理了Java中com.oath.cyclops.async.adapters.Queue.addContinuation方法的一些代码示例,展示了Queue.addContinuation的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue.addContinuation方法的具体详情如下:
包路径:com.oath.cyclops.async.adapters.Queue
类名称:Queue
方法名:addContinuation

Queue.addContinuation介绍

暂无

代码示例

代码示例来源:origin: aol/cyclops

public void addContinuation(final Continuation c) {
  queue.addContinuation(c);
}

代码示例来源:origin: aol/cyclops

public void addContinuation(Continuation cont) {
  distributor.subscribers.forEach(q->q.addContinuation(cont));
}

代码示例来源:origin: aol/cyclops

public void close() {
  counter.completable = true;
  if (queue != null && counter.active.get() == 0) {
    if(counter.closing.compareAndSet(false,true)) {
      counter.closed = true;
      queue.addContinuation(new Continuation(
          () -> {
            throw new ClosedQueueException();
          }));
      queue.close();
    }
  }
}

代码示例来源:origin: aol/cyclops

@Override
public void onComplete() {
  counter.active.decrementAndGet();
  counter.subscription.remove(subscription);
  if (queue != null && counter.active.get() == 0) {
    if (counter.completable) {
      if(counter.closing.compareAndSet(false,true)) {
        counter.closed = true;
        queue.addContinuation(new Continuation(
            () -> {
              final List current = new ArrayList();
              while (queue.size() > 0) {
                try {
                  current.add(queue.get());
                }catch(ClosedQueueException e){
                  break;
                }
              }
              throw new ClosedQueueException(
                  current);
            }));
        queue.close();
      }
    }
  }
}

代码示例来源:origin: aol/cyclops

default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
  AtomicReference<Continuation> ref = new AtomicReference<>(null);
  Continuation cont =
      new Continuation(()->{
        if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            this.spliterator().forEachRemaining(queue::offer);
          }finally {
            queue.close();
          }
        }
          return Continuation.empty();
        });
  ;
  queue.addContinuation(cont);
  return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){

代码示例来源:origin: aol/cyclops

@Override
public Stream<T> unwrapStream() {
  if (async == Type.NO_BACKPRESSURE) {
    Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
                    .build();
    AtomicBoolean wip = new AtomicBoolean(false);
    Continuation cont = new Continuation(() -> {
      if (wip.compareAndSet(false, true)) {
        this.source.subscribeAll(queue::offer, i -> {
          queue.close();
        }, () -> queue.close());
      }
      return Continuation.empty();
    });
    queue.addContinuation(cont);
    return queue.stream();
  }
  return StreamSupport.stream(new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).spliterator(), false);
}

代码示例来源:origin: aol/cyclops

public ReactiveSeq<T> changes(){
    com.oath.cyclops.async.adapters.Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
        .build();
    Spliterator<T> copy = copy();
    Continuation[] contRef ={null};
    Signal<T> signal = new Signal<T>(null, queue);
    AtomicBoolean wip = new AtomicBoolean(false);
    Continuation cont = new Continuation(()->{
      if(wip.compareAndSet(false,true)) {
        if(!copy.tryAdvance(signal::set)){
          signal.close();
          return Continuation.empty();
        }
        wip.set(false);
      }
      return contRef[0];
    });
    contRef[0]= cont;
    queue.addContinuation(cont);
    return signal.getDiscrete().stream();
}

代码示例来源:origin: aol/cyclops

default Topic<T> broadcast(){
  Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
                        .build()
                        .withTimeout(1);
  Topic<T> topic = new Topic<T>(queue,QueueFactories.<T>unboundedNonBlockingQueue());
  AtomicBoolean wip = new AtomicBoolean(false);
  Spliterator<T> split = this.spliterator();
  Continuation ref[] = {null};
  Continuation cont =
      new Continuation(()->{
        if(wip.compareAndSet(false,true)){
          try {
            //use the first consuming thread to tell this Stream onto the Queue
            if(!split.tryAdvance(topic::offer)){
              topic.close();
              return Continuation.empty();
            }
          }finally {
            wip.set(false);
          }
        }
        return ref[0];
      });
  ref[0]=cont;
  queue.addContinuation(cont);
  return topic;
}

代码示例来源:origin: aol/cyclops

@Override
public Iterator<T> iterator() {
  if (async == Type.NO_BACKPRESSURE) {
    Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
        .build();
    AtomicBoolean wip = new AtomicBoolean(false);
    Subscription[] sub = {null};
    Continuation cont = new Continuation(() -> {
      if (wip.compareAndSet(false, true)) {
        this.source.subscribeAll(queue::offer,
            i -> queue.close(),
            () -> queue.close());
      }
      return Continuation.empty();
    });
    queue.addContinuation(cont);
    return queue.stream().iterator();
  }
  return new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).iterator();
}

代码示例来源:origin: aol/cyclops

queue.addContinuation(cont);
  return topic;
queue.addContinuation(cont);
return topic;

代码示例来源:origin: com.oath.cyclops/cyclops

public void addContinuation(final Continuation c) {
  queue.addContinuation(c);
}

代码示例来源:origin: aol/cyclops

queue.addContinuation(cont);

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

public void addContinuation(final Continuation c) {
  queue.addContinuation(c);
}

代码示例来源:origin: aol/cyclops

queue.addContinuation(cont);
return queue.stream();

代码示例来源:origin: aol/cyclops

queue.addContinuation(cont);
return queue.stream();

代码示例来源:origin: com.oath.cyclops/cyclops

public void addContinuation(Continuation cont) {
  distributor.subscribers.forEach(q->q.addContinuation(cont));
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

public void close() {
  counter.completable = true;
  if (queue != null && counter.active.get() == 0) {
    if(counter.closing.compareAndSet(false,true)) {
      counter.closed = true;
      queue.addContinuation(new Continuation(
          () -> {
            throw new ClosedQueueException();
          }));
      queue.close();
    }
  }
}

代码示例来源:origin: com.oath.cyclops/cyclops

public void close() {
  counter.completable = true;
  if (queue != null && counter.active.get() == 0) {
    if(counter.closing.compareAndSet(false,true)) {
      counter.closed = true;
      queue.addContinuation(new Continuation(
          () -> {
            throw new ClosedQueueException();
          }));
      queue.close();
    }
  }
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default Queue<U> toQueue(final Function<Queue, Queue> fn) {
  final Queue<U> queue = fn.apply(getQueueFactory().build());
  final Continuation continuation = thenSync(queue::add).self(s -> {
    if (this.getPopulator().isPoolingActive())
      s.peekSync(v -> {
        throw new CompletedException(
                       v);
      });
  }).runContinuation(() -> {queue.close();});
  queue.addContinuation(continuation);
  return queue;
}

代码示例来源:origin: com.oath.cyclops/cyclops-futurestream

@Override
default void addToQueue(final Queue queue) {
  final Continuation continuation = thenSync(queue::add).self(s -> {
    if (this.getPopulator()
        .isPoolingActive())
      s.peekSync(v -> {
        throw new CompletedException(
                       v);
      });
  }).runContinuation(() -> {throw new ClosedQueueException();
                }
            );
  queue.addContinuation(continuation);
}

相关文章