rx.Observable.flatMapSingle()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(213)

本文整理了Java中rx.Observable.flatMapSingle()方法的一些代码示例,展示了Observable.flatMapSingle()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapSingle()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:flatMapSingle

Observable.flatMapSingle介绍

暂无

代码示例

代码示例来源:origin: georocket/georocket

/**
 * Determine the sizes of all given files
 * @param files the files
 * @param vertx the Vert.x instance
 * @return an observable that emits pairs of file names and sizes
 */
private Observable<Pair<String, Long>> getFileSizes(List<String> files, Vertx vertx) {
 FileSystem fs = vertx.fileSystem();
 return Observable.from(files)
  .flatMapSingle(path -> fs.rxProps(path).map(props -> Pair.of(path, props.size())));
}

代码示例来源:origin: couchbase/java-dcp-client

/**
 * Helper method to return the failover logs for the given partitions (vbids).
 *
 * If the list is empty, the failover logs for all partitions will be returned. Note that the returned
 * ByteBufs can be analyzed using the {@link DcpFailoverLogResponse} flyweight.
 *
 * @param vbids the partitions to return the failover logs from.
 * @return an {@link Observable} containing all failover logs.
 */
public Observable<ByteBuf> failoverLogs(Short... vbids) {
  List<Short> partitions = partitionsForVbids(numPartitions(), vbids);
  LOGGER.debug("Asking for failover logs on partitions {}", partitions);
  return Observable
      .from(partitions)
      .flatMapSingle(new Func1<Short, Single<ByteBuf>>() {
        @Override
        public Single<ByteBuf> call(Short p) {
          return conductor.getFailoverLog(p);
        }
      });
}

代码示例来源:origin: org.jboss.hal/hal-flow

/** Executes multiple tasks in order. */
  static <C extends FlowContext> Single<C> series(C context, Collection<? extends Task<C>> tasks) {
    return Observable.from(tasks)
        .flatMapSingle(task -> task.call(context).toSingleDefault(context), false, 1)
        .doOnSubscribe(() -> context.progress.reset(tasks.size()))
        .doOnNext(c -> c.progress.tick())
        .doOnTerminate(context.progress::finish)
        .lastOrDefault(context).toSingle();
  }
}

代码示例来源:origin: georocket/georocket

.flatMapSingle(pair -> {
 String path = pair.getKey().getKey();
 Long size = pair.getKey().getValue();

代码示例来源:origin: couchbase/java-dcp-client

@SuppressWarnings("unchecked")
public Single<ByteBuf> getFailoverLog(final short partition) {
  return Observable
      .just(partition)
      .map(new Func1<Short, DcpChannel>() {
        @Override
        public DcpChannel call(Short aShort) {
          return masterChannelByPartition(partition);
        }
      })
      .flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
        @Override
        public Single<ByteBuf> call(DcpChannel channel) {
          return channel.getFailoverLog(partition);
        }
      })
      .retryWhen(anyOf(NotConnectedException.class)
          .max(Integer.MAX_VALUE)
          .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
          .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
            @Override
            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
              LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", partition);
            }
          })
          .build()
      ).toSingle();
}

代码示例来源:origin: org.jboss.hal/hal-dmr

/**
 * Executes the composite operation until the operation successfully returns and the precondition is met.
 * The precondition receives the composite result of the operation.
 */
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
    @Nullable Predicate<CompositeResult> until) {
  logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
  Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
      (op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
      (op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
  if (until == null) {
    until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
  }
  return Observable
      .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
      .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
      .flatMapSingle(n -> execution, false, 1)
      .takeUntil(until::test) // until succeeded
      .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}

代码示例来源:origin: georocket/georocket

.map(RxStoreCursor::new)
.flatMapObservable(RxStoreCursor::toObservable)
.flatMapSingle(p -> store.rxGetOne(p.getRight())
 .flatMap(crs -> merger.merge(crs, p.getLeft(), out)

代码示例来源:origin: georocket/georocket

/**
 * Imports a JSON file from the given input stream into the store
 * @param f the JSON file to read
 * @param correlationId a unique identifier for this import process
 * @param filename the name of the file currently being imported
 * @param timestamp denotes when the import process has started
 * @param layer the layer where the file should be stored (may be null)
 * @param tags the list of tags to attach to the file (may be null)
 * @param properties the map of properties to attach to the file (may be null)
 * @return an observable that will emit the number 1 when a chunk has been imported
 */
protected Observable<Integer> importJSON(ReadStream<Buffer> f, String correlationId,
  String filename, long timestamp, String layer, List<String> tags, Map<String, Object> properties) {
 UTF8BomFilter bomFilter = new UTF8BomFilter();
 StringWindow window = new StringWindow();
 GeoJsonSplitter splitter = new GeoJsonSplitter(window);
 AtomicInteger processing = new AtomicInteger(0);
 return f.toObservable()
   .map(Buffer::getDelegate)
   .map(bomFilter::filter)
   .doOnNext(window::append)
   .compose(new JsonParserTransformer())
   .flatMap(splitter::onEventObservable)
   .flatMapSingle(result -> {
    IndexMeta indexMeta = new IndexMeta(correlationId, filename,
      timestamp, tags, properties, null);
    return addToStoreWithPause(result, layer, indexMeta, f, processing)
      .toSingleDefault(1);
   });
}

代码示例来源:origin: couchbase/java-dcp-client

@SuppressWarnings("unchecked")
private Observable<ByteBuf> getSeqnosForChannel(final DcpChannel channel) {
  return Observable
      .just(channel)
      .flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
        @Override
        public Single<ByteBuf> call(DcpChannel channel) {
          return channel.getSeqnos();
        }
      })
      .retryWhen(anyOf(NotConnectedException.class)
          .max(Integer.MAX_VALUE)
          .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
          .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
            @Override
            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
              LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", channel);
            }
          })
          .build()
      );
}

代码示例来源:origin: org.jboss.hal/hal-dmr

/**
 * Executes the operation until the operation successfully returns and the precondition is met. The precondition
 * receives the result of the operation.
 */
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
    @Nullable Predicate<ModelNode> until) {
  logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
  Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
      (op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
      (op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
  if (until == null) {
    until = r -> !r.isFailure(); // default: until success
  }
  return Observable
      .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
      .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
      .flatMapSingle(n -> execution, false, 1)
      .takeUntil(until::test) // until succeeded
      .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}

代码示例来源:origin: georocket/georocket

.flatMapSingle(result -> {
 String crsString = fallbackCRSString;
 if (crsIndexer.getCRS() != null) {

代码示例来源:origin: georocket/georocket

s = metas;
} else {
 s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));

代码示例来源:origin: georocket/georocket

private void doMerge(TestContext context, Observable<Buffer> chunks,
  Observable<ChunkMeta> metas, String jsonContents) {
 MultiMerger m = new MultiMerger(false);
 BufferWriteStream bws = new BufferWriteStream();
 Async async = context.async();
 metas
  .flatMapSingle(meta -> m.init(meta).toSingleDefault(meta))
  .toList()
  .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
    .<ChunkMeta, Pair<ChunkReadStream, ChunkMeta>>zipWith(l, Pair::of))
  .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  .toCompletable()
  .subscribe(() -> {
   m.finish(bws);
   context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
   async.complete();
  }, context::fail);
}

代码示例来源:origin: georocket/georocket

private void doMerge(TestContext context, Observable<Buffer> chunks,
  Observable<GeoJsonChunkMeta> metas, String jsonContents, boolean optimistic) {
 GeoJsonMerger m = new GeoJsonMerger(optimistic);
 BufferWriteStream bws = new BufferWriteStream();
 Async async = context.async();
 Observable<GeoJsonChunkMeta> s;
 if (optimistic) {
  s = metas;
 } else {
  s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
 }
 s.toList()
  .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
    .<GeoJsonChunkMeta, Pair<ChunkReadStream, GeoJsonChunkMeta>>zipWith(l, Pair::of))
  .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  .toCompletable()
  .subscribe(() -> {
   m.finish(bws);
   context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
   async.complete();
  }, context::fail);
}

代码示例来源:origin: georocket/georocket

/**
  * Test if a service can be published an unpublished again
  * @param context the test context
  */
 @Test
 public void unpublish(TestContext context) {
  Vertx vertx = new Vertx(rule.vertx());
  Async async = context.async();

  ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
  Service.publishOnce("A", "a", discovery, vertx)
   .andThen(Service.discover("A", discovery, vertx))
   .count()
   .doOnNext(count -> context.assertEquals(1, count))
   .flatMap(v -> Service.discover("A", discovery, vertx))
   .flatMapSingle(service -> service.unpublish(discovery).toSingleDefault(0))
   .flatMap(v -> Service.discover("A", discovery, vertx))
   .count()
   .doOnTerminate(discovery::close)
   .subscribe(count -> {
    context.assertEquals(0, count);
    async.complete();
   }, context::fail);
 }
}

相关文章

Observable类方法