本文整理了Java中com.oath.cyclops.async.adapters.Queue.addContinuation
方法的一些代码示例,展示了Queue.addContinuation
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue.addContinuation
方法的具体详情如下:
包路径:com.oath.cyclops.async.adapters.Queue
类名称:Queue
方法名:addContinuation
暂无
代码示例来源:origin: aol/cyclops
public void addContinuation(final Continuation c) {
queue.addContinuation(c);
}
代码示例来源:origin: aol/cyclops
public void addContinuation(Continuation cont) {
distributor.subscribers.forEach(q->q.addContinuation(cont));
}
代码示例来源:origin: aol/cyclops
public void close() {
counter.completable = true;
if (queue != null && counter.active.get() == 0) {
if(counter.closing.compareAndSet(false,true)) {
counter.closed = true;
queue.addContinuation(new Continuation(
() -> {
throw new ClosedQueueException();
}));
queue.close();
}
}
}
代码示例来源:origin: aol/cyclops
@Override
public void onComplete() {
counter.active.decrementAndGet();
counter.subscription.remove(subscription);
if (queue != null && counter.active.get() == 0) {
if (counter.completable) {
if(counter.closing.compareAndSet(false,true)) {
counter.closed = true;
queue.addContinuation(new Continuation(
() -> {
final List current = new ArrayList();
while (queue.size() > 0) {
try {
current.add(queue.get());
}catch(ClosedQueueException e){
break;
}
}
throw new ClosedQueueException(
current);
}));
queue.close();
}
}
}
}
代码示例来源:origin: aol/cyclops
default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
AtomicReference<Continuation> ref = new AtomicReference<>(null);
Continuation cont =
new Continuation(()->{
if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
try {
//use the first consuming thread to tell this Stream onto the Queue
this.spliterator().forEachRemaining(queue::offer);
}finally {
queue.close();
}
}
return Continuation.empty();
});
;
queue.addContinuation(cont);
return fn.apply(queue.jdkStream().parallel());
}
default <R> R foldParallel(ForkJoinPool fj,Function<? super Stream<T>,? extends R> fn){
代码示例来源:origin: aol/cyclops
@Override
public Stream<T> unwrapStream() {
if (async == Type.NO_BACKPRESSURE) {
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build();
AtomicBoolean wip = new AtomicBoolean(false);
Continuation cont = new Continuation(() -> {
if (wip.compareAndSet(false, true)) {
this.source.subscribeAll(queue::offer, i -> {
queue.close();
}, () -> queue.close());
}
return Continuation.empty();
});
queue.addContinuation(cont);
return queue.stream();
}
return StreamSupport.stream(new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).spliterator(), false);
}
代码示例来源:origin: aol/cyclops
public ReactiveSeq<T> changes(){
com.oath.cyclops.async.adapters.Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build();
Spliterator<T> copy = copy();
Continuation[] contRef ={null};
Signal<T> signal = new Signal<T>(null, queue);
AtomicBoolean wip = new AtomicBoolean(false);
Continuation cont = new Continuation(()->{
if(wip.compareAndSet(false,true)) {
if(!copy.tryAdvance(signal::set)){
signal.close();
return Continuation.empty();
}
wip.set(false);
}
return contRef[0];
});
contRef[0]= cont;
queue.addContinuation(cont);
return signal.getDiscrete().stream();
}
代码示例来源:origin: aol/cyclops
default Topic<T> broadcast(){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build()
.withTimeout(1);
Topic<T> topic = new Topic<T>(queue,QueueFactories.<T>unboundedNonBlockingQueue());
AtomicBoolean wip = new AtomicBoolean(false);
Spliterator<T> split = this.spliterator();
Continuation ref[] = {null};
Continuation cont =
new Continuation(()->{
if(wip.compareAndSet(false,true)){
try {
//use the first consuming thread to tell this Stream onto the Queue
if(!split.tryAdvance(topic::offer)){
topic.close();
return Continuation.empty();
}
}finally {
wip.set(false);
}
}
return ref[0];
});
ref[0]=cont;
queue.addContinuation(cont);
return topic;
}
代码示例来源:origin: aol/cyclops
@Override
public Iterator<T> iterator() {
if (async == Type.NO_BACKPRESSURE) {
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build();
AtomicBoolean wip = new AtomicBoolean(false);
Subscription[] sub = {null};
Continuation cont = new Continuation(() -> {
if (wip.compareAndSet(false, true)) {
this.source.subscribeAll(queue::offer,
i -> queue.close(),
() -> queue.close());
}
return Continuation.empty();
});
queue.addContinuation(cont);
return queue.stream().iterator();
}
return new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).iterator();
}
代码示例来源:origin: aol/cyclops
queue.addContinuation(cont);
return topic;
queue.addContinuation(cont);
return topic;
代码示例来源:origin: com.oath.cyclops/cyclops
public void addContinuation(final Continuation c) {
queue.addContinuation(c);
}
代码示例来源:origin: aol/cyclops
queue.addContinuation(cont);
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
public void addContinuation(final Continuation c) {
queue.addContinuation(c);
}
代码示例来源:origin: aol/cyclops
queue.addContinuation(cont);
return queue.stream();
代码示例来源:origin: aol/cyclops
queue.addContinuation(cont);
return queue.stream();
代码示例来源:origin: com.oath.cyclops/cyclops
public void addContinuation(Continuation cont) {
distributor.subscribers.forEach(q->q.addContinuation(cont));
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
public void close() {
counter.completable = true;
if (queue != null && counter.active.get() == 0) {
if(counter.closing.compareAndSet(false,true)) {
counter.closed = true;
queue.addContinuation(new Continuation(
() -> {
throw new ClosedQueueException();
}));
queue.close();
}
}
}
代码示例来源:origin: com.oath.cyclops/cyclops
public void close() {
counter.completable = true;
if (queue != null && counter.active.get() == 0) {
if(counter.closing.compareAndSet(false,true)) {
counter.closed = true;
queue.addContinuation(new Continuation(
() -> {
throw new ClosedQueueException();
}));
queue.close();
}
}
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default Queue<U> toQueue(final Function<Queue, Queue> fn) {
final Queue<U> queue = fn.apply(getQueueFactory().build());
final Continuation continuation = thenSync(queue::add).self(s -> {
if (this.getPopulator().isPoolingActive())
s.peekSync(v -> {
throw new CompletedException(
v);
});
}).runContinuation(() -> {queue.close();});
queue.addContinuation(continuation);
return queue;
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default void addToQueue(final Queue queue) {
final Continuation continuation = thenSync(queue::add).self(s -> {
if (this.getPopulator()
.isPoolingActive())
s.peekSync(v -> {
throw new CompletedException(
v);
});
}).runContinuation(() -> {throw new ClosedQueueException();
}
);
queue.addContinuation(continuation);
}
内容来源于网络,如有侵权,请联系作者删除!