rx.Observable.flatMapCompletable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(173)

本文整理了Java中rx.Observable.flatMapCompletable()方法的一些代码示例,展示了Observable.flatMapCompletable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapCompletable()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:flatMapCompletable

Observable.flatMapCompletable介绍

暂无

代码示例

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Completable pauseAllDownloads() {
 return downloadsRepository.getDownloadsInProgress()
   .filter(downloads -> !downloads.isEmpty())
   .flatMapIterable(downloads -> downloads)
   .flatMap(download -> getAppDownloader(download.getMd5()).flatMapCompletable(
     appDownloader -> appDownloader.pauseAppDownload())
     .map(appDownloader -> download))
   .toCompletable();
}

代码示例来源:origin: Aptoide/aptoide-client-v8

public void setup() {
 subscriptions.add(
   Observable.interval(initialDelay, sendInterval, TimeUnit.MILLISECONDS, timerScheduler)
     .flatMap(time -> persistence.getAll()
       .first())
     .filter(events -> events.size() > 0)
     .flatMapCompletable(events -> sendEvents(new ArrayList<>(events)))
     .doOnError(throwable -> crashReport.log(throwable))
     .retry()
     .subscribe());
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Completable removeAppDownload() {
 return Observable.from(app.getDownloadFiles())
   .flatMap(downloadAppFile -> getFileDownloader(downloadAppFile.getMainDownloadPath()))
   .flatMapCompletable(fileDownloader -> fileDownloader.removeDownloadFile())
   .toCompletable();
}

代码示例来源:origin: georocket/georocket

/**
 * Initialize the given merger. Perform a search using the given search string
 * and pass all chunk metadata retrieved to the merger.
 * @param merger the merger to initialize
 * @param data data to use for the initialization
 * @return a Completable that will complete when the merger has been
 * initialized with all results
 */
private Completable initializeMerger(Merger<ChunkMeta> merger, Single<StoreCursor> data) {
 return data
  .map(RxStoreCursor::new)
  .flatMapObservable(RxStoreCursor::toObservable)
  .map(Pair::getLeft)
  .flatMapCompletable(merger::init)
  .toCompletable();
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Completable pauseAppDownload() {
 return Observable.from(app.getDownloadFiles())
   .flatMap(downloadAppFile -> getFileDownloader(downloadAppFile.getMainDownloadPath()))
   .flatMapCompletable(fileDownloader -> fileDownloader.pauseDownload())
   .toCompletable();
}

代码示例来源:origin: couchbase/java-dcp-client

/**
 * Stop DCP streams for the given partition IDs (vbids).
 *
 * If no ids are provided, all partitions will be stopped. Note that you can also use this to "pause" streams
 * if {@link #startStreaming(Short...)} is called later - since the session state is persisted and streaming
 * will resume from the current position.
 *
 * @param vbids the partition ids (0-indexed) to stop streaming for.
 * @return a {@link Completable} indicating that streaming has stopped or failed.
 */
public Completable stopStreaming(Short... vbids) {
  List<Short> partitions = partitionsForVbids(numPartitions(), vbids);
  LOGGER.info("Stopping to Stream for " + partitions.size() + " partitions");
  LOGGER.debug("Stream stop against partitions: {}", partitions);
  return Observable
      .from(partitions)
      .flatMapCompletable(new Func1<Short, Completable>() {
        @Override
        public Completable call(Short p) {
          return conductor.stopStreamForPartition(p);
        }
      })
      .toCompletable();
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Completable pauseDownload(String md5) {
 return downloadsRepository.getDownload(md5)
   .first()
   .map(download -> {
    download.setOverallDownloadStatus(Download.PAUSED);
    downloadsRepository.save(download);
    return download;
   })
   .flatMap(download -> getAppDownloader(download.getMd5()))
   .flatMapCompletable(appDownloader -> appDownloader.pauseAppDownload())
   .toCompletable();
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@NonNull private Completable sendEvents(List<Event> events) {
  return persistence.remove(events)
    .toSingleDefault(events)
    .toObservable()
    .flatMapIterable(__ -> events)
    .map(event -> service.send(event)
      .toObservable()
      .flatMap(o -> Observable.empty())
      .cast(Event.class)
      .onErrorResumeNext(throwable -> Observable.just(event)))
    .toList()
    .flatMap(observables -> Observable.merge(observables))
    .toList()
    .filter(failedEvents -> !failedEvents.isEmpty())
    .flatMapCompletable(failedEvents -> persistence.save(failedEvents))
    .toCompletable();
 }
}

代码示例来源:origin: couchbase/java-dcp-client

public Completable stop() {
  LOGGER.debug("Instructed to shutdown.");
  stopped = true;
  Completable channelShutdown = Observable
      .from(channels)
      .flatMapCompletable(new Func1<DcpChannel, Completable>() {
        @Override
        public Completable call(DcpChannel dcpChannel) {
          return dcpChannel.disconnect();
        }
      })
      .toCompletable();
  if (ownsConfigProvider) {
    channelShutdown = channelShutdown.andThen(configProvider.stop());
  }
  return channelShutdown.doOnCompleted(new Action0() {
    @Override
    public void call() {
      LOGGER.info("Shutdown complete.");
    }
  });
}

代码示例来源:origin: Aptoide/aptoide-client-v8

@Override public Completable invalidateDatabase() {
 return getDownloadsList().first()
   .flatMapIterable(downloads -> downloads)
   .filter(download -> getStateIfFileExists(download) == Download.FILE_MISSING)
   .flatMapCompletable(download -> downloadsRepository.remove(download.getMd5()))
   .toList()
   .toCompletable();
}

代码示例来源:origin: couchbase/java-dcp-client

@SuppressWarnings("unchecked")
public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno,
                      final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
  return Observable
      .just(partition)
      .map(new Func1<Short, DcpChannel>() {
        @Override
        public DcpChannel call(Short aShort) {
          return masterChannelByPartition(partition);
        }
      })
      .flatMapCompletable(new Func1<DcpChannel, Completable>() {
        @Override
        public Completable call(DcpChannel channel) {
          return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno);
        }
      })
      .retryWhen(anyOf(NotConnectedException.class)
          .max(Integer.MAX_VALUE)
          .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
          .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
            @Override
            public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
              LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);
            }
          })
          .build()
      )
      .toCompletable();
}

代码示例来源:origin: georocket/georocket

.reduce((p1, p2) -> Pair.of(p1.getLeft() + p2.getLeft(),
  p1.getRight() + p2.getRight()))
.flatMapCompletable(p -> {
 long count = p.getLeft();
 long notaccepted = p.getRight();

代码示例来源:origin: couchbase/java-dcp-client

.flatMapCompletable(new Func1<Short, Completable>() {
  @Override
  public Completable call(Short partition) {

代码示例来源:origin: couchbase/java-dcp-client

.flatMapCompletable(new Func1<Long, Completable>() {
  @Override
  public Completable call(Long aLong) {

代码示例来源:origin: georocket/georocket

})
.toList()
.flatMapCompletable(l -> {
 if (!l.isEmpty()) {
  return insertDocuments(l);

代码示例来源:origin: georocket/georocket

@Override
public void start() {
 log.info("Launching importer ...");
 store = new RxStore(StoreFactory.createStore(getVertx()));
 String storagePath = config().getString(ConfigConstants.STORAGE_FILE_PATH);
 incoming = storagePath + "/incoming";
 
 vertx.eventBus().<JsonObject>localConsumer(AddressConstants.IMPORTER_IMPORT)
  .toObservable()
  .onBackpressureBuffer() // unlimited buffer
  .flatMapCompletable(msg -> {
   // call onImport() but ignore errors. onImport() will handle errors for us.
   return onImport(msg).onErrorComplete();
  }, false, MAX_PARALLEL_IMPORTS)
  .subscribe(v -> {
   // ignore
  }, err -> {
   // This is bad. It will unsubscribe the consumer from the eventbus!
   // Should never happen anyhow. If it does, something else has
   // completely gone wrong.
   log.fatal("Could not import file", err);
  });
 vertx.eventBus().localConsumer(AddressConstants.IMPORTER_PAUSE, this::onPause);
}

代码示例来源:origin: georocket/georocket

.buffer(BUFFER_TIMESPAN, TimeUnit.MILLISECONDS, maxBulkSize)
.flatMapCompletable(messages -> {
 queuedAddMessages -= messages.size();

代码示例来源:origin: georocket/georocket

.flatMap(l -> chunks.map(DelegateChunkReadStream::new)
  .<XMLChunkMeta, Pair<ChunkReadStream, XMLChunkMeta>>zipWith(l, Pair::of))
.flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
.toCompletable()
.subscribe(() -> {

代码示例来源:origin: georocket/georocket

private void doMerge(TestContext context, Observable<Buffer> chunks,
  Observable<ChunkMeta> metas, String jsonContents) {
 MultiMerger m = new MultiMerger(false);
 BufferWriteStream bws = new BufferWriteStream();
 Async async = context.async();
 metas
  .flatMapSingle(meta -> m.init(meta).toSingleDefault(meta))
  .toList()
  .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
    .<ChunkMeta, Pair<ChunkReadStream, ChunkMeta>>zipWith(l, Pair::of))
  .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  .toCompletable()
  .subscribe(() -> {
   m.finish(bws);
   context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
   async.complete();
  }, context::fail);
}

代码示例来源:origin: georocket/georocket

private void doMerge(TestContext context, Observable<Buffer> chunks,
  Observable<GeoJsonChunkMeta> metas, String jsonContents, boolean optimistic) {
 GeoJsonMerger m = new GeoJsonMerger(optimistic);
 BufferWriteStream bws = new BufferWriteStream();
 Async async = context.async();
 Observable<GeoJsonChunkMeta> s;
 if (optimistic) {
  s = metas;
 } else {
  s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
 }
 s.toList()
  .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
    .<GeoJsonChunkMeta, Pair<ChunkReadStream, GeoJsonChunkMeta>>zipWith(l, Pair::of))
  .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  .toCompletable()
  .subscribe(() -> {
   m.finish(bws);
   context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
   async.complete();
  }, context::fail);
}

相关文章

Observable类方法