rx.Observable.flatMapCompletable()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(203)

本文整理了Java中rx.Observable.flatMapCompletable()方法的一些代码示例,展示了Observable.flatMapCompletable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapCompletable()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:flatMapCompletable

Observable.flatMapCompletable介绍

暂无

代码示例

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Completable pauseAllDownloads() {
  2. return downloadsRepository.getDownloadsInProgress()
  3. .filter(downloads -> !downloads.isEmpty())
  4. .flatMapIterable(downloads -> downloads)
  5. .flatMap(download -> getAppDownloader(download.getMd5()).flatMapCompletable(
  6. appDownloader -> appDownloader.pauseAppDownload())
  7. .map(appDownloader -> download))
  8. .toCompletable();
  9. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. public void setup() {
  2. subscriptions.add(
  3. Observable.interval(initialDelay, sendInterval, TimeUnit.MILLISECONDS, timerScheduler)
  4. .flatMap(time -> persistence.getAll()
  5. .first())
  6. .filter(events -> events.size() > 0)
  7. .flatMapCompletable(events -> sendEvents(new ArrayList<>(events)))
  8. .doOnError(throwable -> crashReport.log(throwable))
  9. .retry()
  10. .subscribe());
  11. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Completable removeAppDownload() {
  2. return Observable.from(app.getDownloadFiles())
  3. .flatMap(downloadAppFile -> getFileDownloader(downloadAppFile.getMainDownloadPath()))
  4. .flatMapCompletable(fileDownloader -> fileDownloader.removeDownloadFile())
  5. .toCompletable();
  6. }

代码示例来源:origin: georocket/georocket

  1. /**
  2. * Initialize the given merger. Perform a search using the given search string
  3. * and pass all chunk metadata retrieved to the merger.
  4. * @param merger the merger to initialize
  5. * @param data data to use for the initialization
  6. * @return a Completable that will complete when the merger has been
  7. * initialized with all results
  8. */
  9. private Completable initializeMerger(Merger<ChunkMeta> merger, Single<StoreCursor> data) {
  10. return data
  11. .map(RxStoreCursor::new)
  12. .flatMapObservable(RxStoreCursor::toObservable)
  13. .map(Pair::getLeft)
  14. .flatMapCompletable(merger::init)
  15. .toCompletable();
  16. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Completable pauseAppDownload() {
  2. return Observable.from(app.getDownloadFiles())
  3. .flatMap(downloadAppFile -> getFileDownloader(downloadAppFile.getMainDownloadPath()))
  4. .flatMapCompletable(fileDownloader -> fileDownloader.pauseDownload())
  5. .toCompletable();
  6. }

代码示例来源:origin: couchbase/java-dcp-client

  1. /**
  2. * Stop DCP streams for the given partition IDs (vbids).
  3. *
  4. * If no ids are provided, all partitions will be stopped. Note that you can also use this to "pause" streams
  5. * if {@link #startStreaming(Short...)} is called later - since the session state is persisted and streaming
  6. * will resume from the current position.
  7. *
  8. * @param vbids the partition ids (0-indexed) to stop streaming for.
  9. * @return a {@link Completable} indicating that streaming has stopped or failed.
  10. */
  11. public Completable stopStreaming(Short... vbids) {
  12. List<Short> partitions = partitionsForVbids(numPartitions(), vbids);
  13. LOGGER.info("Stopping to Stream for " + partitions.size() + " partitions");
  14. LOGGER.debug("Stream stop against partitions: {}", partitions);
  15. return Observable
  16. .from(partitions)
  17. .flatMapCompletable(new Func1<Short, Completable>() {
  18. @Override
  19. public Completable call(Short p) {
  20. return conductor.stopStreamForPartition(p);
  21. }
  22. })
  23. .toCompletable();
  24. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Completable pauseDownload(String md5) {
  2. return downloadsRepository.getDownload(md5)
  3. .first()
  4. .map(download -> {
  5. download.setOverallDownloadStatus(Download.PAUSED);
  6. downloadsRepository.save(download);
  7. return download;
  8. })
  9. .flatMap(download -> getAppDownloader(download.getMd5()))
  10. .flatMapCompletable(appDownloader -> appDownloader.pauseAppDownload())
  11. .toCompletable();
  12. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @NonNull private Completable sendEvents(List<Event> events) {
  2. return persistence.remove(events)
  3. .toSingleDefault(events)
  4. .toObservable()
  5. .flatMapIterable(__ -> events)
  6. .map(event -> service.send(event)
  7. .toObservable()
  8. .flatMap(o -> Observable.empty())
  9. .cast(Event.class)
  10. .onErrorResumeNext(throwable -> Observable.just(event)))
  11. .toList()
  12. .flatMap(observables -> Observable.merge(observables))
  13. .toList()
  14. .filter(failedEvents -> !failedEvents.isEmpty())
  15. .flatMapCompletable(failedEvents -> persistence.save(failedEvents))
  16. .toCompletable();
  17. }
  18. }

代码示例来源:origin: couchbase/java-dcp-client

  1. public Completable stop() {
  2. LOGGER.debug("Instructed to shutdown.");
  3. stopped = true;
  4. Completable channelShutdown = Observable
  5. .from(channels)
  6. .flatMapCompletable(new Func1<DcpChannel, Completable>() {
  7. @Override
  8. public Completable call(DcpChannel dcpChannel) {
  9. return dcpChannel.disconnect();
  10. }
  11. })
  12. .toCompletable();
  13. if (ownsConfigProvider) {
  14. channelShutdown = channelShutdown.andThen(configProvider.stop());
  15. }
  16. return channelShutdown.doOnCompleted(new Action0() {
  17. @Override
  18. public void call() {
  19. LOGGER.info("Shutdown complete.");
  20. }
  21. });
  22. }

代码示例来源:origin: Aptoide/aptoide-client-v8

  1. @Override public Completable invalidateDatabase() {
  2. return getDownloadsList().first()
  3. .flatMapIterable(downloads -> downloads)
  4. .filter(download -> getStateIfFileExists(download) == Download.FILE_MISSING)
  5. .flatMapCompletable(download -> downloadsRepository.remove(download.getMd5()))
  6. .toList()
  7. .toCompletable();
  8. }

代码示例来源:origin: couchbase/java-dcp-client

  1. @SuppressWarnings("unchecked")
  2. public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno,
  3. final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
  4. return Observable
  5. .just(partition)
  6. .map(new Func1<Short, DcpChannel>() {
  7. @Override
  8. public DcpChannel call(Short aShort) {
  9. return masterChannelByPartition(partition);
  10. }
  11. })
  12. .flatMapCompletable(new Func1<DcpChannel, Completable>() {
  13. @Override
  14. public Completable call(DcpChannel channel) {
  15. return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno);
  16. }
  17. })
  18. .retryWhen(anyOf(NotConnectedException.class)
  19. .max(Integer.MAX_VALUE)
  20. .delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
  21. .doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
  22. @Override
  23. public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
  24. LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);
  25. }
  26. })
  27. .build()
  28. )
  29. .toCompletable();
  30. }

代码示例来源:origin: georocket/georocket

  1. .reduce((p1, p2) -> Pair.of(p1.getLeft() + p2.getLeft(),
  2. p1.getRight() + p2.getRight()))
  3. .flatMapCompletable(p -> {
  4. long count = p.getLeft();
  5. long notaccepted = p.getRight();

代码示例来源:origin: couchbase/java-dcp-client

  1. .flatMapCompletable(new Func1<Short, Completable>() {
  2. @Override
  3. public Completable call(Short partition) {

代码示例来源:origin: couchbase/java-dcp-client

  1. .flatMapCompletable(new Func1<Long, Completable>() {
  2. @Override
  3. public Completable call(Long aLong) {

代码示例来源:origin: georocket/georocket

  1. })
  2. .toList()
  3. .flatMapCompletable(l -> {
  4. if (!l.isEmpty()) {
  5. return insertDocuments(l);

代码示例来源:origin: georocket/georocket

  1. @Override
  2. public void start() {
  3. log.info("Launching importer ...");
  4. store = new RxStore(StoreFactory.createStore(getVertx()));
  5. String storagePath = config().getString(ConfigConstants.STORAGE_FILE_PATH);
  6. incoming = storagePath + "/incoming";
  7. vertx.eventBus().<JsonObject>localConsumer(AddressConstants.IMPORTER_IMPORT)
  8. .toObservable()
  9. .onBackpressureBuffer() // unlimited buffer
  10. .flatMapCompletable(msg -> {
  11. // call onImport() but ignore errors. onImport() will handle errors for us.
  12. return onImport(msg).onErrorComplete();
  13. }, false, MAX_PARALLEL_IMPORTS)
  14. .subscribe(v -> {
  15. // ignore
  16. }, err -> {
  17. // This is bad. It will unsubscribe the consumer from the eventbus!
  18. // Should never happen anyhow. If it does, something else has
  19. // completely gone wrong.
  20. log.fatal("Could not import file", err);
  21. });
  22. vertx.eventBus().localConsumer(AddressConstants.IMPORTER_PAUSE, this::onPause);
  23. }

代码示例来源:origin: georocket/georocket

  1. .buffer(BUFFER_TIMESPAN, TimeUnit.MILLISECONDS, maxBulkSize)
  2. .flatMapCompletable(messages -> {
  3. queuedAddMessages -= messages.size();

代码示例来源:origin: georocket/georocket

  1. .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
  2. .<XMLChunkMeta, Pair<ChunkReadStream, XMLChunkMeta>>zipWith(l, Pair::of))
  3. .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  4. .toCompletable()
  5. .subscribe(() -> {

代码示例来源:origin: georocket/georocket

  1. private void doMerge(TestContext context, Observable<Buffer> chunks,
  2. Observable<ChunkMeta> metas, String jsonContents) {
  3. MultiMerger m = new MultiMerger(false);
  4. BufferWriteStream bws = new BufferWriteStream();
  5. Async async = context.async();
  6. metas
  7. .flatMapSingle(meta -> m.init(meta).toSingleDefault(meta))
  8. .toList()
  9. .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
  10. .<ChunkMeta, Pair<ChunkReadStream, ChunkMeta>>zipWith(l, Pair::of))
  11. .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  12. .toCompletable()
  13. .subscribe(() -> {
  14. m.finish(bws);
  15. context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
  16. async.complete();
  17. }, context::fail);
  18. }

代码示例来源:origin: georocket/georocket

  1. private void doMerge(TestContext context, Observable<Buffer> chunks,
  2. Observable<GeoJsonChunkMeta> metas, String jsonContents, boolean optimistic) {
  3. GeoJsonMerger m = new GeoJsonMerger(optimistic);
  4. BufferWriteStream bws = new BufferWriteStream();
  5. Async async = context.async();
  6. Observable<GeoJsonChunkMeta> s;
  7. if (optimistic) {
  8. s = metas;
  9. } else {
  10. s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
  11. }
  12. s.toList()
  13. .flatMap(l -> chunks.map(DelegateChunkReadStream::new)
  14. .<GeoJsonChunkMeta, Pair<ChunkReadStream, GeoJsonChunkMeta>>zipWith(l, Pair::of))
  15. .flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
  16. .toCompletable()
  17. .subscribe(() -> {
  18. m.finish(bws);
  19. context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
  20. async.complete();
  21. }, context::fail);
  22. }

相关文章

Observable类方法