本文整理了Java中rx.Observable.flatMapCompletable()
方法的一些代码示例,展示了Observable.flatMapCompletable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapCompletable()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:flatMapCompletable
暂无
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Completable pauseAllDownloads() {
return downloadsRepository.getDownloadsInProgress()
.filter(downloads -> !downloads.isEmpty())
.flatMapIterable(downloads -> downloads)
.flatMap(download -> getAppDownloader(download.getMd5()).flatMapCompletable(
appDownloader -> appDownloader.pauseAppDownload())
.map(appDownloader -> download))
.toCompletable();
}
代码示例来源:origin: Aptoide/aptoide-client-v8
public void setup() {
subscriptions.add(
Observable.interval(initialDelay, sendInterval, TimeUnit.MILLISECONDS, timerScheduler)
.flatMap(time -> persistence.getAll()
.first())
.filter(events -> events.size() > 0)
.flatMapCompletable(events -> sendEvents(new ArrayList<>(events)))
.doOnError(throwable -> crashReport.log(throwable))
.retry()
.subscribe());
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Completable removeAppDownload() {
return Observable.from(app.getDownloadFiles())
.flatMap(downloadAppFile -> getFileDownloader(downloadAppFile.getMainDownloadPath()))
.flatMapCompletable(fileDownloader -> fileDownloader.removeDownloadFile())
.toCompletable();
}
代码示例来源:origin: georocket/georocket
/**
* Initialize the given merger. Perform a search using the given search string
* and pass all chunk metadata retrieved to the merger.
* @param merger the merger to initialize
* @param data data to use for the initialization
* @return a Completable that will complete when the merger has been
* initialized with all results
*/
private Completable initializeMerger(Merger<ChunkMeta> merger, Single<StoreCursor> data) {
return data
.map(RxStoreCursor::new)
.flatMapObservable(RxStoreCursor::toObservable)
.map(Pair::getLeft)
.flatMapCompletable(merger::init)
.toCompletable();
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Completable pauseAppDownload() {
return Observable.from(app.getDownloadFiles())
.flatMap(downloadAppFile -> getFileDownloader(downloadAppFile.getMainDownloadPath()))
.flatMapCompletable(fileDownloader -> fileDownloader.pauseDownload())
.toCompletable();
}
代码示例来源:origin: couchbase/java-dcp-client
/**
* Stop DCP streams for the given partition IDs (vbids).
*
* If no ids are provided, all partitions will be stopped. Note that you can also use this to "pause" streams
* if {@link #startStreaming(Short...)} is called later - since the session state is persisted and streaming
* will resume from the current position.
*
* @param vbids the partition ids (0-indexed) to stop streaming for.
* @return a {@link Completable} indicating that streaming has stopped or failed.
*/
public Completable stopStreaming(Short... vbids) {
List<Short> partitions = partitionsForVbids(numPartitions(), vbids);
LOGGER.info("Stopping to Stream for " + partitions.size() + " partitions");
LOGGER.debug("Stream stop against partitions: {}", partitions);
return Observable
.from(partitions)
.flatMapCompletable(new Func1<Short, Completable>() {
@Override
public Completable call(Short p) {
return conductor.stopStreamForPartition(p);
}
})
.toCompletable();
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Completable pauseDownload(String md5) {
return downloadsRepository.getDownload(md5)
.first()
.map(download -> {
download.setOverallDownloadStatus(Download.PAUSED);
downloadsRepository.save(download);
return download;
})
.flatMap(download -> getAppDownloader(download.getMd5()))
.flatMapCompletable(appDownloader -> appDownloader.pauseAppDownload())
.toCompletable();
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@NonNull private Completable sendEvents(List<Event> events) {
return persistence.remove(events)
.toSingleDefault(events)
.toObservable()
.flatMapIterable(__ -> events)
.map(event -> service.send(event)
.toObservable()
.flatMap(o -> Observable.empty())
.cast(Event.class)
.onErrorResumeNext(throwable -> Observable.just(event)))
.toList()
.flatMap(observables -> Observable.merge(observables))
.toList()
.filter(failedEvents -> !failedEvents.isEmpty())
.flatMapCompletable(failedEvents -> persistence.save(failedEvents))
.toCompletable();
}
}
代码示例来源:origin: couchbase/java-dcp-client
public Completable stop() {
LOGGER.debug("Instructed to shutdown.");
stopped = true;
Completable channelShutdown = Observable
.from(channels)
.flatMapCompletable(new Func1<DcpChannel, Completable>() {
@Override
public Completable call(DcpChannel dcpChannel) {
return dcpChannel.disconnect();
}
})
.toCompletable();
if (ownsConfigProvider) {
channelShutdown = channelShutdown.andThen(configProvider.stop());
}
return channelShutdown.doOnCompleted(new Action0() {
@Override
public void call() {
LOGGER.info("Shutdown complete.");
}
});
}
代码示例来源:origin: Aptoide/aptoide-client-v8
@Override public Completable invalidateDatabase() {
return getDownloadsList().first()
.flatMapIterable(downloads -> downloads)
.filter(download -> getStateIfFileExists(download) == Download.FILE_MISSING)
.flatMapCompletable(download -> downloadsRepository.remove(download.getMd5()))
.toList()
.toCompletable();
}
代码示例来源:origin: couchbase/java-dcp-client
@SuppressWarnings("unchecked")
public Completable startStreamForPartition(final short partition, final long vbuuid, final long startSeqno,
final long endSeqno, final long snapshotStartSeqno, final long snapshotEndSeqno) {
return Observable
.just(partition)
.map(new Func1<Short, DcpChannel>() {
@Override
public DcpChannel call(Short aShort) {
return masterChannelByPartition(partition);
}
})
.flatMapCompletable(new Func1<DcpChannel, Completable>() {
@Override
public Completable call(DcpChannel channel) {
return channel.openStream(partition, vbuuid, startSeqno, endSeqno, snapshotStartSeqno, snapshotEndSeqno);
}
})
.retryWhen(anyOf(NotConnectedException.class)
.max(Integer.MAX_VALUE)
.delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
.doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
@Override
public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
LOGGER.debug("Rescheduling Stream Start for vbid {}, not connected (yet).", partition);
}
})
.build()
)
.toCompletable();
}
代码示例来源:origin: georocket/georocket
.reduce((p1, p2) -> Pair.of(p1.getLeft() + p2.getLeft(),
p1.getRight() + p2.getRight()))
.flatMapCompletable(p -> {
long count = p.getLeft();
long notaccepted = p.getRight();
代码示例来源:origin: couchbase/java-dcp-client
.flatMapCompletable(new Func1<Short, Completable>() {
@Override
public Completable call(Short partition) {
代码示例来源:origin: couchbase/java-dcp-client
.flatMapCompletable(new Func1<Long, Completable>() {
@Override
public Completable call(Long aLong) {
代码示例来源:origin: georocket/georocket
})
.toList()
.flatMapCompletable(l -> {
if (!l.isEmpty()) {
return insertDocuments(l);
代码示例来源:origin: georocket/georocket
@Override
public void start() {
log.info("Launching importer ...");
store = new RxStore(StoreFactory.createStore(getVertx()));
String storagePath = config().getString(ConfigConstants.STORAGE_FILE_PATH);
incoming = storagePath + "/incoming";
vertx.eventBus().<JsonObject>localConsumer(AddressConstants.IMPORTER_IMPORT)
.toObservable()
.onBackpressureBuffer() // unlimited buffer
.flatMapCompletable(msg -> {
// call onImport() but ignore errors. onImport() will handle errors for us.
return onImport(msg).onErrorComplete();
}, false, MAX_PARALLEL_IMPORTS)
.subscribe(v -> {
// ignore
}, err -> {
// This is bad. It will unsubscribe the consumer from the eventbus!
// Should never happen anyhow. If it does, something else has
// completely gone wrong.
log.fatal("Could not import file", err);
});
vertx.eventBus().localConsumer(AddressConstants.IMPORTER_PAUSE, this::onPause);
}
代码示例来源:origin: georocket/georocket
.buffer(BUFFER_TIMESPAN, TimeUnit.MILLISECONDS, maxBulkSize)
.flatMapCompletable(messages -> {
queuedAddMessages -= messages.size();
代码示例来源:origin: georocket/georocket
.flatMap(l -> chunks.map(DelegateChunkReadStream::new)
.<XMLChunkMeta, Pair<ChunkReadStream, XMLChunkMeta>>zipWith(l, Pair::of))
.flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
.toCompletable()
.subscribe(() -> {
代码示例来源:origin: georocket/georocket
private void doMerge(TestContext context, Observable<Buffer> chunks,
Observable<ChunkMeta> metas, String jsonContents) {
MultiMerger m = new MultiMerger(false);
BufferWriteStream bws = new BufferWriteStream();
Async async = context.async();
metas
.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta))
.toList()
.flatMap(l -> chunks.map(DelegateChunkReadStream::new)
.<ChunkMeta, Pair<ChunkReadStream, ChunkMeta>>zipWith(l, Pair::of))
.flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
.toCompletable()
.subscribe(() -> {
m.finish(bws);
context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
async.complete();
}, context::fail);
}
代码示例来源:origin: georocket/georocket
private void doMerge(TestContext context, Observable<Buffer> chunks,
Observable<GeoJsonChunkMeta> metas, String jsonContents, boolean optimistic) {
GeoJsonMerger m = new GeoJsonMerger(optimistic);
BufferWriteStream bws = new BufferWriteStream();
Async async = context.async();
Observable<GeoJsonChunkMeta> s;
if (optimistic) {
s = metas;
} else {
s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
}
s.toList()
.flatMap(l -> chunks.map(DelegateChunkReadStream::new)
.<GeoJsonChunkMeta, Pair<ChunkReadStream, GeoJsonChunkMeta>>zipWith(l, Pair::of))
.flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
.toCompletable()
.subscribe(() -> {
m.finish(bws);
context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
async.complete();
}, context::fail);
}
内容来源于网络,如有侵权,请联系作者删除!