本文整理了Java中reactor.core.publisher.Mono.expandDeep()
方法的一些代码示例,展示了Mono.expandDeep()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.expandDeep()
方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:expandDeep
[英]Recursively expand elements into a graph and emit all the resulting element, in a depth-first traversal order.
That is: emit the value from this Mono, expand it and emit the first value at this first level of recursion, and so on... When no more recursion is possible, backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A
- AA
- aa1
- AB
- ab1
- a1
Expands Mono.just(A) into
A
AA
aa1
AB
ab1
a1
[中]递归地将元素展开为一个图,并以深度优先的遍历顺序发出所有结果元素。
也就是说:从这个Mono中发出值,展开它,在这个递归的第一级发出第一个值,依此类推。。。当不再可能进行递归时,返回到上一级并重新应用策略。
例如,考虑到层次结构
A
- AA
- aa1
- AB
- ab1
- a1
扩展单声道。仅(A)进入<<1$>>
代码示例来源:origin: reactor/reactor-core
return expandDeep(expander, Queues.SMALL_BUFFER_SIZE);
代码示例来源:origin: reactor/reactor-core
@Test
public void emptyDepth() {
StepVerifier.create(Mono.<Integer>empty()
.expandDeep(countDown))
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expanderThrowsDepth() {
StepVerifier.create(Mono.just(10)
.expandDeep(v -> {
throw new IllegalStateException("boom");
}))
.expectNext(10)
.verifyErrorMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void depthCancelRace2() throws Exception {
for (int i = 0; i < 1000; i++) {
final TestPublisher<Integer> pp = TestPublisher.create();
Flux<Integer> source = Mono.just(0)
.expandDeep(it -> pp);
final CountDownLatch cdl = new CountDownLatch(1);
AssertSubscriber<Integer> ts = new AssertSubscriber<Integer>() {
final AtomicInteger sync = new AtomicInteger(2);
@Override
public void onNext(Integer t) {
super.onNext(t);
Schedulers.single().schedule(() -> {
if (sync.decrementAndGet() != 0) {
while (sync.get() != 0) { }
}
cancel();
cdl.countDown();
});
if (sync.decrementAndGet() != 0) {
while (sync.get() != 0) { }
}
}
};
source.subscribe(ts);
assertThat(cdl.await(5, TimeUnit.SECONDS)).as("runs under 5s").isTrue();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void errorDepth() {
StepVerifier.create(Mono.<Integer>error(new IllegalStateException("boom"))
.expandDeep(countDown))
.verifyErrorMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void expanderReturnsNullDepth() {
StepVerifier.create(Mono.just(10)
.expandDeep(v -> null))
.expectNext(10)
.verifyError(NullPointerException.class);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void recursiveCountdownDepth() {
StepVerifier.create(Mono.just(10)
.expandDeep(countDown))
.expectNext(10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void javadocExampleDepthFirst() {
List<String> depthFirstExpected = Arrays.asList(
"A",
"AA",
"aa1",
"AB",
"ab1",
"a1");
StepVerifier.create(
Mono.just(ROOT)
.expandDeep(v -> Flux.fromIterable(v.children))
.map(n -> n.name))
.expectNextSequence(depthFirstExpected)
.verifyComplete();
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void recursiveCountdownLoopDepth() {
for (int i = 0; i < 1000; i = (i < 100 ? i + 1 : i + 50)) {
String tag = "i = " + i + ", strategy = depth";
List<Integer> list = new ArrayList<>();
StepVerifier.create(Mono.just(i)
.expandDeep(countDown))
.expectSubscription()
.recordWith(() -> list)
.expectNextCount(i + 1)
.as(tag)
.verifyComplete();
for (int j = 0; j <= i; j++) {
assertThat(list.get(j).intValue())
.as(tag + ", " + list)
.isEqualTo(i - j);
}
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void recursiveCountdownTakeDepth() {
StepVerifier.create(Mono.just(10)
.expandDeep(countDown)
.take(5)
)
.expectNext(10, 9, 8, 7, 6)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void depthFirst() {
FluxExpandTest.Node root = createTest();
StepVerifier.create(Mono.just(root)
.expandDeep(v -> Flux.fromIterable(v.children))
.map(v -> v.name))
.expectNext(
"root",
"1", "11",
"2", "21", "22", "221",
"3", "31", "32", "321", "33", "331", "332", "3321",
"4", "41", "42", "421", "43", "431", "432", "4321",
"44", "441", "442", "4421", "443", "4431", "4432"
)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void depthCancelRace() {
for (int i = 0; i < 1000; i++) {
final AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
Mono.just(0)
.expandDeep(countDown)
.subscribe(ts);
Runnable r1 = () -> ts.request(1);
Runnable r2 = ts::cancel;
RaceTestUtils.race(r1, r2, Schedulers.single());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void depthCompleteCancelRace() {
for (int i = 0; i < 1000; i++) {
final TestPublisher<Integer> pp = TestPublisher.create();
final AssertSubscriber<Integer> ts = AssertSubscriber.create(1);
Mono.just(0)
.expandDeep(it -> pp)
.subscribe(ts);
Runnable r1 = pp::complete;
Runnable r2 = ts::cancel;
RaceTestUtils.race(r1, r2, Schedulers.single());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void depthEmitCancelRace() {
for (int i = 0; i < 1000; i++) {
final TestPublisher<Integer> pp = TestPublisher.create();
final AssertSubscriber<Integer> ts = AssertSubscriber.create(1);
Mono.just(0)
.expandDeep(it -> pp)
.subscribe(ts);
Runnable r1 = () -> pp.next(1);
Runnable r2 = ts::cancel;
RaceTestUtils.race(r1, r2, Schedulers.single());
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void depthFirstAsync() {
FluxExpandTest.Node root = createTest();
StepVerifier.create(Mono.just(root)
.expandDeep(v -> Flux.fromIterable(v.children)
.subscribeOn(Schedulers.elastic()))
.map(v -> v.name))
.expectNext(
"root",
"1", "11",
"2", "21", "22", "221",
"3", "31", "32", "321", "33", "331", "332", "3321",
"4", "41", "42", "421", "43", "431", "432", "4321",
"44", "441", "442", "4421", "443", "4431", "4432"
)
.expectComplete()
.verify(Duration.ofSeconds(5));
}
代码示例来源:origin: reactor/reactor-core
.expandDeep(it -> pp)
.subscribe(s);
代码示例来源:origin: reactor/reactor-core
@Test
public void recursiveCountdownBackpressureDepth() {
StepVerifier.create(Mono.just(10)
.expandDeep(countDown),
StepVerifierOptions.create()
.initialRequest(0)
.checkUnderRequesting(false))
.thenRequest(1)
.expectNext(10)
.thenRequest(3)
.expectNext(9, 8, 7)
.thenRequest(4)
.expectNext(6, 5, 4, 3)
.thenRequest(3)
.expectNext(2, 1, 0)
.verifyComplete();
}
代码示例来源:origin: io.projectreactor/reactor-core
return expandDeep(expander, Queues.SMALL_BUFFER_SIZE);
内容来源于网络,如有侵权,请联系作者删除!