本文整理了Java中rx.Observable.flatMapSingle()
方法的一些代码示例,展示了Observable.flatMapSingle()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.flatMapSingle()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:flatMapSingle
暂无
代码示例来源:origin: georocket/georocket
/**
* Determine the sizes of all given files
* @param files the files
* @param vertx the Vert.x instance
* @return an observable that emits pairs of file names and sizes
*/
private Observable<Pair<String, Long>> getFileSizes(List<String> files, Vertx vertx) {
FileSystem fs = vertx.fileSystem();
return Observable.from(files)
.flatMapSingle(path -> fs.rxProps(path).map(props -> Pair.of(path, props.size())));
}
代码示例来源:origin: couchbase/java-dcp-client
/**
* Helper method to return the failover logs for the given partitions (vbids).
*
* If the list is empty, the failover logs for all partitions will be returned. Note that the returned
* ByteBufs can be analyzed using the {@link DcpFailoverLogResponse} flyweight.
*
* @param vbids the partitions to return the failover logs from.
* @return an {@link Observable} containing all failover logs.
*/
public Observable<ByteBuf> failoverLogs(Short... vbids) {
List<Short> partitions = partitionsForVbids(numPartitions(), vbids);
LOGGER.debug("Asking for failover logs on partitions {}", partitions);
return Observable
.from(partitions)
.flatMapSingle(new Func1<Short, Single<ByteBuf>>() {
@Override
public Single<ByteBuf> call(Short p) {
return conductor.getFailoverLog(p);
}
});
}
代码示例来源:origin: org.jboss.hal/hal-flow
/** Executes multiple tasks in order. */
static <C extends FlowContext> Single<C> series(C context, Collection<? extends Task<C>> tasks) {
return Observable.from(tasks)
.flatMapSingle(task -> task.call(context).toSingleDefault(context), false, 1)
.doOnSubscribe(() -> context.progress.reset(tasks.size()))
.doOnNext(c -> c.progress.tick())
.doOnTerminate(context.progress::finish)
.lastOrDefault(context).toSingle();
}
}
代码示例来源:origin: georocket/georocket
.flatMapSingle(pair -> {
String path = pair.getKey().getKey();
Long size = pair.getKey().getValue();
代码示例来源:origin: couchbase/java-dcp-client
@SuppressWarnings("unchecked")
public Single<ByteBuf> getFailoverLog(final short partition) {
return Observable
.just(partition)
.map(new Func1<Short, DcpChannel>() {
@Override
public DcpChannel call(Short aShort) {
return masterChannelByPartition(partition);
}
})
.flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
@Override
public Single<ByteBuf> call(DcpChannel channel) {
return channel.getFailoverLog(partition);
}
})
.retryWhen(anyOf(NotConnectedException.class)
.max(Integer.MAX_VALUE)
.delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
.doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
@Override
public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
LOGGER.debug("Rescheduling Get Failover Log for vbid {}, not connected (yet).", partition);
}
})
.build()
).toSingle();
}
代码示例来源:origin: org.jboss.hal/hal-dmr
/**
* Executes the composite operation until the operation successfully returns and the precondition is met.
* The precondition receives the composite result of the operation.
*/
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatCompositeUntil(Dispatcher dispatcher, int timeout, Composite composite,
@Nullable Predicate<CompositeResult> until) {
logger.debug("Repeat {} using {} seconds as timeout", composite, timeout);
Single<CompositeResult> execution = Single.fromEmitter(em -> dispatcher.execute(composite, em::onSuccess,
(op, fail) -> em.onSuccess(compositeFailure("Dispatcher failure: " + fail)),
(op, ex) -> em.onSuccess(compositeFailure("Dispatcher exception: " + ex.getMessage()))));
if (until == null) {
until = r -> r.stream().noneMatch(ModelNode::isFailure); // default: until success
}
return Observable
.interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
.doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), composite))
.flatMapSingle(n -> execution, false, 1)
.takeUntil(until::test) // until succeeded
.toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}
代码示例来源:origin: georocket/georocket
.map(RxStoreCursor::new)
.flatMapObservable(RxStoreCursor::toObservable)
.flatMapSingle(p -> store.rxGetOne(p.getRight())
.flatMap(crs -> merger.merge(crs, p.getLeft(), out)
代码示例来源:origin: georocket/georocket
/**
* Imports a JSON file from the given input stream into the store
* @param f the JSON file to read
* @param correlationId a unique identifier for this import process
* @param filename the name of the file currently being imported
* @param timestamp denotes when the import process has started
* @param layer the layer where the file should be stored (may be null)
* @param tags the list of tags to attach to the file (may be null)
* @param properties the map of properties to attach to the file (may be null)
* @return an observable that will emit the number 1 when a chunk has been imported
*/
protected Observable<Integer> importJSON(ReadStream<Buffer> f, String correlationId,
String filename, long timestamp, String layer, List<String> tags, Map<String, Object> properties) {
UTF8BomFilter bomFilter = new UTF8BomFilter();
StringWindow window = new StringWindow();
GeoJsonSplitter splitter = new GeoJsonSplitter(window);
AtomicInteger processing = new AtomicInteger(0);
return f.toObservable()
.map(Buffer::getDelegate)
.map(bomFilter::filter)
.doOnNext(window::append)
.compose(new JsonParserTransformer())
.flatMap(splitter::onEventObservable)
.flatMapSingle(result -> {
IndexMeta indexMeta = new IndexMeta(correlationId, filename,
timestamp, tags, properties, null);
return addToStoreWithPause(result, layer, indexMeta, f, processing)
.toSingleDefault(1);
});
}
代码示例来源:origin: couchbase/java-dcp-client
@SuppressWarnings("unchecked")
private Observable<ByteBuf> getSeqnosForChannel(final DcpChannel channel) {
return Observable
.just(channel)
.flatMapSingle(new Func1<DcpChannel, Single<ByteBuf>>() {
@Override
public Single<ByteBuf> call(DcpChannel channel) {
return channel.getSeqnos();
}
})
.retryWhen(anyOf(NotConnectedException.class)
.max(Integer.MAX_VALUE)
.delay(Delay.fixed(200, TimeUnit.MILLISECONDS))
.doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>() {
@Override
public void call(Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) {
LOGGER.debug("Rescheduling get Seqnos for channel {}, not connected (yet).", channel);
}
})
.build()
);
}
代码示例来源:origin: org.jboss.hal/hal-dmr
/**
* Executes the operation until the operation successfully returns and the precondition is met. The precondition
* receives the result of the operation.
*/
@SuppressWarnings("HardCodedStringLiteral")
public static Completable repeatOperationUntil(Dispatcher dispatcher, int timeout, Operation operation,
@Nullable Predicate<ModelNode> until) {
logger.debug("Repeat {} using {} seconds timeout", operation.asCli(), timeout);
Single<ModelNode> execution = Single.fromEmitter(em -> dispatcher.execute(operation, em::onSuccess,
(op, fail) -> em.onSuccess(operationFailure("Dispatcher failure: " + fail)),
(op, ex) -> em.onSuccess(operationFailure("Dispatcher exception: " + ex.getMessage()))));
if (until == null) {
until = r -> !r.isFailure(); // default: until success
}
return Observable
.interval(INTERVAL, MILLISECONDS) // execute a operation each INTERVAL millis
.doOnEach(n -> logger.debug("#{}: execute {}", n.getValue(), operation.asCli()))
.flatMapSingle(n -> execution, false, 1)
.takeUntil(until::test) // until succeeded
.toCompletable().timeout(timeout, SECONDS); // wait succeeded or stop after timeout seconds
}
代码示例来源:origin: georocket/georocket
.flatMapSingle(result -> {
String crsString = fallbackCRSString;
if (crsIndexer.getCRS() != null) {
代码示例来源:origin: georocket/georocket
s = metas;
} else {
s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
代码示例来源:origin: georocket/georocket
private void doMerge(TestContext context, Observable<Buffer> chunks,
Observable<ChunkMeta> metas, String jsonContents) {
MultiMerger m = new MultiMerger(false);
BufferWriteStream bws = new BufferWriteStream();
Async async = context.async();
metas
.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta))
.toList()
.flatMap(l -> chunks.map(DelegateChunkReadStream::new)
.<ChunkMeta, Pair<ChunkReadStream, ChunkMeta>>zipWith(l, Pair::of))
.flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
.toCompletable()
.subscribe(() -> {
m.finish(bws);
context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
async.complete();
}, context::fail);
}
代码示例来源:origin: georocket/georocket
private void doMerge(TestContext context, Observable<Buffer> chunks,
Observable<GeoJsonChunkMeta> metas, String jsonContents, boolean optimistic) {
GeoJsonMerger m = new GeoJsonMerger(optimistic);
BufferWriteStream bws = new BufferWriteStream();
Async async = context.async();
Observable<GeoJsonChunkMeta> s;
if (optimistic) {
s = metas;
} else {
s = metas.flatMapSingle(meta -> m.init(meta).toSingleDefault(meta));
}
s.toList()
.flatMap(l -> chunks.map(DelegateChunkReadStream::new)
.<GeoJsonChunkMeta, Pair<ChunkReadStream, GeoJsonChunkMeta>>zipWith(l, Pair::of))
.flatMapCompletable(p -> m.merge(p.getLeft(), p.getRight(), bws))
.toCompletable()
.subscribe(() -> {
m.finish(bws);
context.assertEquals(jsonContents, bws.getBuffer().toString("utf-8"));
async.complete();
}, context::fail);
}
代码示例来源:origin: georocket/georocket
/**
* Test if a service can be published an unpublished again
* @param context the test context
*/
@Test
public void unpublish(TestContext context) {
Vertx vertx = new Vertx(rule.vertx());
Async async = context.async();
ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
Service.publishOnce("A", "a", discovery, vertx)
.andThen(Service.discover("A", discovery, vertx))
.count()
.doOnNext(count -> context.assertEquals(1, count))
.flatMap(v -> Service.discover("A", discovery, vertx))
.flatMapSingle(service -> service.unpublish(discovery).toSingleDefault(0))
.flatMap(v -> Service.discover("A", discovery, vertx))
.count()
.doOnTerminate(discovery::close)
.subscribe(count -> {
context.assertEquals(0, count);
async.complete();
}, context::fail);
}
}
内容来源于网络,如有侵权,请联系作者删除!