reactor.core.publisher.Mono.flatMapIterable()方法的使用及代码示例

x33g5p2x  于2022-01-24 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(519)

本文整理了Java中reactor.core.publisher.Mono.flatMapIterable()方法的一些代码示例,展示了Mono.flatMapIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Mono.flatMapIterable()方法的具体详情如下:
包路径:reactor.core.publisher.Mono
类名称:Mono
方法名:flatMapIterable

Mono.flatMapIterable介绍

[英]Transform the item emitted by this Mono into Iterable, then forward its elements into the returned Flux. The prefetch argument allows to give an arbitrary prefetch size to the inner Iterable.
[中]将此Mono发出的项转换为Iterable,然后将其元素转发到返回的通量中。prefetch参数允许为内部Iterable指定任意预取大小。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<Instance> findAll() {
  return Mono.fromSupplier(this.snapshots::values).flatMapIterable(Function.identity());
}

代码示例来源:origin: spring-projects/spring-security

@Override
public Mono<AuthorizationDecision> check(Mono<Authentication> authentication, T object) {
  return authentication
    .filter(a -> a.isAuthenticated())
    .flatMapIterable( a -> a.getAuthorities())
    .map(g -> g.getAuthority())
    .any(a -> this.authorities.contains(a))
    .map( hasAuthority -> new AuthorizationDecision(hasAuthority))
    .defaultIfEmpty(new AuthorizationDecision(false));
}

代码示例来源:origin: reactor/reactor-core

/**
 * Sort elements from this {@link Flux} using a {@link Comparator} function, by
 * collecting and sorting elements in the background then emitting the sorted sequence
 * once this sequence completes.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}
 *
 * @param sortFunction a function that compares two items emitted by this {@link Flux}
 * to indicate their sort order
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(Comparator<? super T> sortFunction) {
  return collectSortedList(sortFunction).flatMapIterable(identityFunction());
}

代码示例来源:origin: reactor/reactor-core

/**
 * Sort elements from this {@link Flux} by collecting and sorting them in the background
 * then emitting the sorted sequence once this sequence completes.
 * Each item emitted by the {@link Flux} must implement {@link Comparable} with
 * respect to all other items in the sequence.
 *
 * <p>Note that calling {@code sort} with long, non-terminating or infinite sources
 * might cause {@link OutOfMemoryError}. Use sequence splitting like {@link #window} to sort batches in that case.
 *
 * @throws ClassCastException if any item emitted by the {@link Flux} does not implement
 * {@link Comparable} with respect to all other items emitted by the {@link Flux}
 * @return a sorted {@link Flux}
 */
public final Flux<T> sort(){
  return collectSortedList().flatMapIterable(identityFunction());
}

代码示例来源:origin: lettuce-io/lettuce-core

@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {
  List<K> keyList = LettuceLists.newList(keys);
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
  if (partitioned.size() < 2) {
    return super.mget(keyList);
  }
  List<Publisher<KeyValue<K, V>>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.mget(entry.getValue()));
  }
  Flux<KeyValue<K, V>> fluxes = Flux.concat(publishers);
  Mono<List<KeyValue<K, V>>> map = fluxes.collectList().map(vs -> {
    KeyValue<K, V>[] values = new KeyValue[vs.size()];
    int offset = 0;
    for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
      for (int i = 0; i < keyList.size(); i++) {
        int index = entry.getValue().indexOf(keyList.get(i));
        if (index == -1) {
          continue;
        }
        values[i] = vs.get(offset + index);
      }
      offset += entry.getValue().size();
    }
    return Arrays.asList(values);
  });
  return map.flatMapIterable(keyValues -> keyValues);
}

代码示例来源:origin: org.springframework.security/spring-security-core

@Override
public Mono<AuthorizationDecision> check(Mono<Authentication> authentication, T object) {
  return authentication
    .filter(a -> a.isAuthenticated())
    .flatMapIterable( a -> a.getAuthorities())
    .map( g-> g.getAuthority())
    .hasElement(this.authority)
    .map( hasAuthority -> new AuthorizationDecision(hasAuthority))
    .defaultIfEmpty(new AuthorizationDecision(false));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void shallExecuteSideEffectsCallback() {
  Flux<Integer> result = Mono.just(Arrays.asList(1, 2))
                //.doOnEach(sig -> System.out.println("SIGNAL beforeMap " + sig))// <- if enabled than everything is fine
                .map(x -> x)
                .doOnEach(sig -> {throw new RuntimeException("expected");})
                .flatMapIterable(Function.identity());
  StepVerifier.create(result).expectError().verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void syncDrainWithPollFailure() {
    Flux<Integer> p = Mono.just(Arrays.asList(1, 2, 3))
        .filter(l -> { throw new IllegalStateException("boom"); })
        .flatMapIterable(Function.identity());

    StepVerifier.create(p)
          .expectErrorMessage("boom")
          .verify(Duration.ofSeconds(1));
  }
}

代码示例来源:origin: reactor/reactor-core

public void asyncDrainWithPollFailure() {
  Flux<Integer> p = Flux.range(1, 3)
            .collectList()
            .filter(l -> { throw new IllegalStateException("boom"); })
            .flatMapIterable(Function.identity());
  StepVerifier.create(p)
        .expectErrorMessage("boom")
        .verify(Duration.ofSeconds(1));
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_workaroundFlux() {
  Flux<String> source = Flux.<String>create(sink -> {
    sink.next("a");
    sink.next("b");
    sink.complete();
  })
      .collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
      .hide()
      .flatMapIterable(Function.identity());
  StepVerifier.create(source)
        .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(IllegalStateException.class)
        .hasMessage("boom"))
        .verify();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextDiscardCaptureWithNoInitialContext() {
  StepVerifier.create(Mono.subscriberContext()
              .flatMapIterable(ctx -> ctx.stream()
                            .map(Map.Entry::getKey)
                            .map(String::valueOf)
                            .collect(Collectors.toList())
              ).concatWithValues("A", "B")
              .filter(s -> s.length() > 1)
  )
        .expectNext("reactor.onDiscard.local")
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly("A", "B");
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
  public void gh841_workaroundStream() {
    Flux<String> source = Flux.<String>create(sink -> {
      sink.next("a");
      sink.next("b");
      sink.complete();
    })
        .collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
        .hide()
        .flatMapIterable(Function.identity());

    assertThatExceptionOfType(IllegalStateException.class)
        .isThrownBy(() -> source.toStream()
                    .collect(Collectors.toSet()))
        .withMessage("boom");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextDiscardCaptureWithInitialContext() {
  Context initial = Context.of("foo", "bar");
  StepVerifier.create(Mono.subscriberContext()
      .flatMapIterable(ctx -> ctx.stream()
                    .map(Map.Entry::getKey)
                    .map(String::valueOf)
                    .collect(Collectors.toList())
              ).concatWithValues("A", "B")
              .filter(s -> s.length() > 1)
      , StepVerifierOptions.create().withInitialContext(initial))
        .expectNext("foo")
        .expectNext("reactor.onDiscard.local")
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly("A", "B");
}

代码示例来源:origin: reactor/reactor-core

/**
 * See https://github.com/reactor/reactor-core/issues/453
 */
@Test
public void testDrainAsyncCompletesSeveralBatches() {
  StepVerifier.create(Flux.range(0, 72)
              .collectList()
              .flatMapIterable(Function.identity())
              .zipWith(Flux.range(1000, 100))
              .count())
        .expectNext(72L)
        .verifyComplete();
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param mapper
 * @return
 * @see reactor.core.publisher.Mono#flatMapIterable(java.util.function.Function)
 */
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
  return boxed.flatMapIterable(mapper);
}
/**

代码示例来源:origin: io.netifi.proteus/tracing-openzipkin

Flux<Trace> streamTraces(Mono<InputStream> input) {
 return input.map(is -> {
  try {
   return objectMapper.<Traces>readValue(
     is,
     new TypeReference<Traces>() {
     });
  } catch (IOException e) {
   throw Exceptions.propagate(e);
  }
 }).flatMapIterable(Traces::getTracesList)
   ;
}

代码示例来源:origin: org.springframework.vault/spring-vault-core

@Override
@SuppressWarnings("unchecked")
public Flux<String> list(String path) {
  Assert.hasText(path, "Path must not be empty");
  Mono<VaultListResponse> read = doRead(
      String.format("%s?list=true", path.endsWith("/") ? path : (path + "/")),
      VaultListResponse.class);
  return read
      .filter(response -> response.getData() != null
          && response.getData().containsKey("keys"))
      //
      .flatMapIterable(
          response -> (List<String>) response.getRequiredData().get("keys"));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Mono<String> getRouterGroupId(RoutingClient routingClient, String routerGroup) {
  return requestListRouterGroups(routingClient)
    .flatMapIterable(ListRouterGroupsResponse::getRouterGroups)
    .filter(group -> routerGroup.equals(group.getName()))
    .single()
    .map(org.cloudfoundry.routing.v1.routergroups.RouterGroup::getRouterGroupId);
}

代码示例来源:origin: apache/servicemix-bundles

@Override
public Mono<AuthorizationDecision> check(Mono<Authentication> authentication, T object) {
  return authentication
    .filter(a -> a.isAuthenticated())
    .flatMapIterable( a -> a.getAuthorities())
    .map( g-> g.getAuthority())
    .hasElement(this.authority)
    .map( hasAuthority -> new AuthorizationDecision(hasAuthority))
    .defaultIfEmpty(new AuthorizationDecision(false));
}

代码示例来源:origin: org.cloudfoundry/cloudfoundry-operations

private static Flux<LogMessage> getLogs(Mono<DopplerClient> dopplerClient, String applicationId, Boolean recent) {
  if (Optional.ofNullable(recent).orElse(false)) {
    return requestLogsRecent(dopplerClient, applicationId)
      .filter(e -> EventType.LOG_MESSAGE == e.getEventType())
      .map(Envelope::getLogMessage)
      .collectSortedList(LOG_MESSAGE_COMPARATOR)
      .flatMapIterable(d -> d);
  } else {
    return requestLogsStream(dopplerClient, applicationId)
      .filter(e -> EventType.LOG_MESSAGE == e.getEventType())
      .map(Envelope::getLogMessage)
      .compose(SortingUtils.timespan(LOG_MESSAGE_COMPARATOR, LOG_MESSAGE_TIMESPAN));
  }
}

相关文章

Mono类方法