rx.Observable.flatMapSingle()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.8k)|赞(0)|评价(0)|浏览(237)

本文整理了Java中rx.Observable.flatMapSingle()方法的一些代码示例,展示了Observable.flatMapSingle()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapSingle()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:flatMapSingle

Observable.flatMapSingle介绍

暂无

代码示例

代码示例来源:origin: georocket/georocket

  1. /**
  2. * Determine the sizes of all given files
  3. * @param files the files
  4. * @param vertx the Vert.x instance
  5. * @return an observable that emits pairs of file names and sizes
  6. */
  7. private Observable<Pair<String, Long>> getFileSizes(List<String> files, Vertx vertx) {
  8. FileSystem fs = vertx.fileSystem();
  9. return Observable.from(files)
  10. .flatMapSingle(path -> fs.rxProps(path).map(props -> Pair.of(path, props.size())));
  11. }

代码示例来源:origin: couchbase/java-dcp-client

  1. /**
  2. * Helper method to return the failover logs for the given partitions (vbids).
  3. *
  4. * If the list is empty, the failover logs for all partitions will be returned. Note that the returned
  5. * ByteBufs can be analyzed using the {@link DcpFailoverLogResponse} flyweight.
  6. *
  7. * @param vbids the partitions to return the failover logs from.
  8. * @return an {@link Observable} containing all failover logs.
  9. */
  10. public Observable<ByteBuf> failoverLogs(Short... vbids) {
  11. List<Short> partitions = partitionsForVbids(numPartitions(), vbids);
  12. LOGGER.debug("Asking for failover logs on partitions {}", partitions);
  13. return Observable
  14. .from(partitions)
  15. .flatMapSingle(new Func1<Short, Single<ByteBuf>>() {
  16. @Override
  17. public Single<ByteBuf> call(Short p) {
  18. return conductor.getFailoverLog(p);
  19. }
  20. });
  21. }

代码示例来源:origin: org.jboss.hal/hal-flow

  1. /** Executes multiple tasks in order. */
  2. static <C extends FlowContext> Single<C> series(C context, Collection<? extends Task<C>> tasks) {
  3. return Observable.from(tasks)
  4. .flatMapSingle(task -> task.call(context).toSingleDefault(context), false, 1)
  5. .doOnSubscribe(() -> context.progress.reset(tasks.size()))
  6. .doOnNext(c -> c.progress.tick())
  7. .doOnTerminate(context.progress::finish)
  8. .lastOrDefault(context).toSingle();
  9. }
  10. }

代码示例来源:origin: georocket/georocket

  1. .flatMapSingle(pair -> {
  2. String path = pair.getKey().getKey();
  3. Long size = pair.getKey().getValue();

代码示例来源:origin: couchbase/java-dcp-client

  1. @SuppressWarnings("unchecked")
  2. public Single<ByteBuf> getFailoverLog(final short partition) {
  3. return Observable
  4. .just(partition)
  5. .map(new Func1<Short, DcpChannel>() {
  6. @Override
  7. public DcpChannel call(Short aShort) {
  8. return masterChannelByPartition(partition);
  9. }
  10. })
  11. .flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
  12. @Override
  13. public Single<ByteBuf> call(DcpChannel channel) {
  14. return channel.getFailoverLog(partition);
  15. }
  16. })
  17. .retryWhen(anyOf(NotConnectedException.class)
  18. .max(Integer.MAX_VALUE)
  19. .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
  20. .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
  21. @Override
  22. public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
  23. LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", partition);
  24. }
  25. })
  26. .build()
  27. ).toSingle();
  28. }

代码示例来源:origin: org.jboss.hal/hal-dmr

  1. /**
  2. * Executes the composite operation until the operation successfully returns and the precondition is met.
  3. * The precondition receives the composite result of the operation.
  4. */
  5. @SuppressWarnings("HardCodedStringLiteral")
  6. public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
  7. @Nullable Predicate<CompositeResult> until) {
  8. logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
  9. Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
  10. (op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
  11. (op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
  12. if (until == null) {
  13. until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
  14. }
  15. return Observable
  16. .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
  17. .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
  18. .flatMapSingle(n -> execution, false, 1)
  19. .takeUntil(until::test) // until succeeded
  20. .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
  21. }

代码示例来源:origin: georocket/georocket

  1. .map(RxStoreCursor::new)
  2. .flatMapObservable(RxStoreCursor::toObservable)
  3. .flatMapSingle(p -> store.rxGetOne(p.getRight())
  4. .flatMap(crs -> merger.merge(crs, p.getLeft(), out)

代码示例来源:origin: georocket/georocket

  1. /**
  2. * Imports a JSON file from the given input stream into the store
  3. * @param f the JSON file to read
  4. * @param correlationId a unique identifier for this import process
  5. * @param filename the name of the file currently being imported
  6. * @param timestamp denotes when the import process has started
  7. * @param layer the layer where the file should be stored (may be null)
  8. * @param tags the list of tags to attach to the file (may be null)
  9. * @param properties the map of properties to attach to the file (may be null)
  10. * @return an observable that will emit the number 1 when a chunk has been imported
  11. */
  12. protected Observable<Integer> importJSON(ReadStream<Buffer> f, String correlationId,
  13. String filename, long timestamp, String layer, List<String> tags, Map<String, Object> properties) {
  14. UTF8BomFilter bomFilter = new UTF8BomFilter();
  15. StringWindow window = new StringWindow();
  16. GeoJsonSplitter splitter = new GeoJsonSplitter(window);
  17. AtomicInteger processing = new AtomicInteger(0);
  18. return f.toObservable()
  19. .map(Buffer::getDelegate)
  20. .map(bomFilter::filter)
  21. .doOnNext(window::append)
  22. .compose(new JsonParserTransformer())
  23. .flatMap(splitter::onEventObservable)
  24. .flatMapSingle(result -> {
  25. IndexMeta indexMeta = new IndexMeta(correlationId, filename,
  26. timestamp, tags, properties, null);
  27. return addToStoreWithPause(result, layer, indexMeta, f, processing)
  28. .toSingleDefault(1);
  29. });
  30. }

代码示例来源:origin: couchbase/java-dcp-client

  1. @SuppressWarnings("unchecked")
  2. private Observable<ByteBuf> getSeqnosForChannel(final DcpChannel channel) {
  3. return Observable
  4. .just(channel)
  5. .flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
  6. @Override
  7. public Single<ByteBuf> call(DcpChannel channel) {
  8. return channel.getSeqnos();
  9. }
  10. })
  11. .retryWhen(anyOf(NotConnectedException.class)
  12. .max(Integer.MAX_VALUE)
  13. .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
  14. .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
  15. @Override
  16. public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
  17. LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", channel);
  18. }
  19. })
  20. .build()
  21. );
  22. }

代码示例来源:origin: org.jboss.hal/hal-dmr

  1. /**
  2. * Executes the operation until the operation successfully returns and the precondition is met. The precondition
  3. * receives the result of the operation.
  4. */
  5. @SuppressWarnings("HardCodedStringLiteral")
  6. public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
  7. @Nullable Predicate<ModelNode> until) {
  8. logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
  9. Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
  10. (op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
  11. (op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
  12. if (until == null) {
  13. until = r -> !r.isFailure(); // default: until success
  14. }
  15. return Observable
  16. .interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
  17. .doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
  18. .flatMapSingle(n -> execution, false, 1)
  19. .takeUntil(until::test) // until succeeded
  20. .toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
  21. }

代码示例来源:origin: georocket/georocket

  1. .flatMapSingle(result -> {
  2. String crsString = fallbackCRSString;
  3. if (crsIndexer.getCRS() != null) {

代码示例来源:origin: georocket/georocket

  1. s = metas;
  2. } else {
  3. s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));

代码示例来源:origin: georocket/georocket

  1. private void doMerge(TestContext context, Observable<Buffer> chunks,
  2. Observable<ChunkMeta> metas, String jsonContents) {
  3. MultiMerger m = new MultiMerger(false);
  4. BufferWriteStream bws = new BufferWriteStream();
  5. Async async = context.async();
  6. metas
  7. .flatMapSingle(meta -> m.init(meta).toSingleDefault(meta))
  8. .toList()
  9. .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
  10. .<ChunkMeta, Pair<ChunkReadStream, ChunkMeta>>zipWith(l, Pair::of))
  11. .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  12. .toCompletable()
  13. .subscribe(() -> {
  14. m.finish(bws);
  15. context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
  16. async.complete();
  17. }, context::fail);
  18. }

代码示例来源:origin: georocket/georocket

  1. private void doMerge(TestContext context, Observable<Buffer> chunks,
  2. Observable<GeoJsonChunkMeta> metas, String jsonContents, boolean optimistic) {
  3. GeoJsonMerger m = new GeoJsonMerger(optimistic);
  4. BufferWriteStream bws = new BufferWriteStream();
  5. Async async = context.async();
  6. Observable<GeoJsonChunkMeta> s;
  7. if (optimistic) {
  8. s = metas;
  9. } else {
  10. s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
  11. }
  12. s.toList()
  13. .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
  14. .<GeoJsonChunkMeta, Pair<ChunkReadStream, GeoJsonChunkMeta>>zipWith(l, Pair::of))
  15. .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  16. .toCompletable()
  17. .subscribe(() -> {
  18. m.finish(bws);
  19. context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
  20. async.complete();
  21. }, context::fail);
  22. }

代码示例来源:origin: georocket/georocket

  1. /**
  2. * Test if a service can be published an unpublished again
  3. * @param context the test context
  4. */
  5. @Test
  6. public void unpublish(TestContext context) {
  7. Vertx vertx = new Vertx(rule.vertx());
  8. Async async = context.async();
  9. ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
  10. Service.publishOnce("A", "a", discovery, vertx)
  11. .andThen(Service.discover("A", discovery, vertx))
  12. .count()
  13. .doOnNext(count -> context.assertEquals(1, count))
  14. .flatMap(v -> Service.discover("A", discovery, vertx))
  15. .flatMapSingle(service -> service.unpublish(discovery).toSingleDefault(0))
  16. .flatMap(v -> Service.discover("A", discovery, vertx))
  17. .count()
  18. .doOnTerminate(discovery::close)
  19. .subscribe(count -> {
  20. context.assertEquals(0, count);
  21. async.complete();
  22. }, context::fail);
  23. }
  24. }

相关文章

Observable类方法