本文整理了Java中com.oath.cyclops.async.adapters.Queue.stream
方法的一些代码示例,展示了Queue.stream
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Queue.stream
方法的具体详情如下:
包路径:com.oath.cyclops.async.adapters.Queue
类名称:Queue
方法名:stream
暂无
代码示例来源:origin: aol/cyclops
@Override
public ReactiveSeq<T> stream(final Continueable s) {
return connect(q -> q.stream(s));
}
代码示例来源:origin: aol/cyclops
/**
* @return Infinite (until Queue is closed) Stream of CompletableFutures
* that can be used as input into a SimpleReact concurrent dataflow
*
* This Stream itself is Sequential, SimpleReact will applyHKT
* concurrency / parralellism via the constituent CompletableFutures
*
*/
@Override
public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
return stream().map(CompletableFuture::completedFuture);
}
代码示例来源:origin: aol/cyclops
/**
* Generating a stream will register the Stream as a reactiveSubscriber to this topic.
* It will be provided with an internal Queue as a mailbox. @see Topic.disconnect to disconnect from the topic
* @return Stream of data
*/
@Override
public ReactiveSeq<T> stream() {
return connect(q -> q.stream());
}
代码示例来源:origin: aol/cyclops
private Stream<T> genJdkStream() {
final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
return queue.stream(subscription);
}
代码示例来源:origin: aol/cyclops
@Override
public Stream<T> unwrapStream() {
if (async == Type.NO_BACKPRESSURE) {
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build();
AtomicBoolean wip = new AtomicBoolean(false);
Continuation cont = new Continuation(() -> {
if (wip.compareAndSet(false, true)) {
this.source.subscribeAll(queue::offer, i -> {
queue.close();
}, () -> queue.close());
}
return Continuation.empty();
});
queue.addContinuation(cont);
return queue.stream();
}
return StreamSupport.stream(new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).spliterator(), false);
}
代码示例来源:origin: aol/cyclops
@Override
public Iterator<T> iterator() {
if (async == Type.NO_BACKPRESSURE) {
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue()
.build();
AtomicBoolean wip = new AtomicBoolean(false);
Subscription[] sub = {null};
Continuation cont = new Continuation(() -> {
if (wip.compareAndSet(false, true)) {
this.source.subscribeAll(queue::offer,
i -> queue.close(),
() -> queue.close());
}
return Continuation.empty();
});
queue.addContinuation(cont);
return queue.stream().iterator();
}
return new OperatorToIterable<>(source, this.defaultErrorHandler, async == BACKPRESSURE).iterator();
}
代码示例来源:origin: aol/cyclops
@Test
public void publishTest() {
for (int k = 0; k < ITERATIONS; k++) {
Queue<Integer> queue = QueueFactories.<Integer>boundedNonBlockingQueue(10)
.build();
Thread t = new Thread(() -> {
try {
System.out.println("Sleeping!");
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waking!");
System.out.println("Closing! " + queue.size());
queue.close();
});
t.start();
of(1, 2, 3).peek(i -> System.out.println("publishing " + i))
.publishTo(queue)
.forEach(System.out::println);
assertThat(queue.stream().collect(Collectors.toList()), equalTo(Arrays.asList(1, 2, 3)));
t = null;
System.gc();
}
}
代码示例来源:origin: aol/cyclops
return queue.stream();
代码示例来源:origin: aol/cyclops
return queue.stream();
代码示例来源:origin: aol/cyclops
@Test
public void queueTest(){
com.oath.cyclops.async.adapters.Queue<Integer> q = new Queue<>();
q.add(1);
q.add(2);
q.add(3);
q.stream().limit(3).forEach(System.out::println);
q.add(4);
q.add(5);
q.stream().limit(2).forEach(System.out::println);
}
@Test
代码示例来源:origin: com.oath.cyclops/cyclops
/**
* @return Infinite (until Queue is closed) Stream of CompletableFutures
* that can be used as input into a SimpleReact concurrent dataflow
*
* This Stream itself is Sequential, SimpleReact will applyHKT
* concurrency / parralellism via the constituent CompletableFutures
*
*/
@Override
public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
return stream().map(CompletableFuture::completedFuture);
}
代码示例来源:origin: com.oath.cyclops/cyclops
/**
* Generating a stream will register the Stream as a reactiveSubscriber to this topic.
* It will be provided with an internal Queue as a mailbox. @see Topic.disconnect to disconnect from the topic
* @return Stream of data
*/
@Override
public ReactiveSeq<T> stream() {
return connect(q -> q.stream());
}
代码示例来源:origin: com.oath.cyclops/cyclops
@Override
public ReactiveSeq<T> stream(final Continueable s) {
return connect(q -> q.stream(s));
}
代码示例来源:origin: com.oath.cyclops/cyclops
private Stream<T> genJdkStream() {
final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
return queue.stream(subscription);
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
private Stream<T> genJdkStream() {
final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
return queue.stream(subscription);
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
/**
* Wrap a Stream into a SimpleReactStream.
*/
static <T> SimpleReactStream<T> simpleReactStream(Stream<T> stream) {
if (stream instanceof FutureStream)
stream = ((FutureStream) stream).toQueue()
.stream(((FutureStream) stream).getSubscription());
final SimpleReact sr = new SimpleReact(
ThreadPools.getCurrentThreadExecutor(),
false);
return new SimpleReactStreamImpl<T>(
sr, stream.map(CompletableFuture::completedFuture));
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
private FutureStream<T> genStream() {
final Continueable subscription = new com.oath.cyclops.react.async.subscription.Subscription();
return new LazyReact().of()
.withSubscription(subscription)
.fromStream(queue.stream(subscription));
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
default Iterator<U> iterator() {
final Queue<U> q = toQueue();
if (getSubscription().closed())
return new CloseableIterator<>(
Arrays.<U> asList()
.iterator(),
getSubscription(), null);
return new CloseableIterator<>(
q.stream(getSubscription())
.iterator(),
getSubscription(), q);
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default FutureStream<U> limit(final long maxSize) {
final Continueable sub = this.getSubscription();
sub.registerLimit(maxSize);
return fromStream(ReactiveSeq.oneShotStream(toQueue().stream(sub))
.limit(maxSize));
}
代码示例来源:origin: com.oath.cyclops/cyclops-futurestream
@Override
default FutureStream<U> skip(final long n) {
final Continueable sub = this.getSubscription();
sub.registerSkip(n);
return fromStream(ReactiveSeq.oneShotStream(toQueue().stream(sub))
.skip(n));
}
内容来源于网络,如有侵权,请联系作者删除!