本文整理了Java中rx.Observable.map()
方法的一些代码示例,展示了Observable.map()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.map()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:map
[英]Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.
Scheduler: map does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: hidroh/materialistic
@Override
public void parse(String itemId, String url, Callback callback) {
Observable.defer(() -> fromCache(itemId))
.subscribeOn(mIoScheduler)
.flatMap(content -> content != null ?
Observable.just(content) : fromNetwork(itemId, url))
.map(content -> AndroidUtils.TextUtils.equals(EMPTY_CONTENT, content) ? null : content)
.observeOn(mMainThreadScheduler)
.subscribe(callback::onResponse);
}
代码示例来源:origin: apache/usergrid
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( writeVerifyUnique );
// optimistic verification
Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( writeOptimisticVerify );
final Observable<CollectionIoEvent<MvccEntity>> zip =
Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
return zip;
} );
}
代码示例来源:origin: PipelineAI/pipeline
protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
.getLiveUrl(cid)
.compose(this.bindToLifecycle())
.map(responseBody -> {
try {
String str = responseBody.string();
.flatMap(new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
代码示例来源:origin: apache/usergrid
@Override
public Observable<MarkedEdge> compactNode( final Id inputNode ) {
final UUID startTime = UUIDGenerator.newTimeUUID();
final Observable<MarkedEdge> nodeObservable =
Observable.just( inputNode )
.map( node -> nodeSerialization.getMaxVersion( scope, node ) )
//.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
.takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
//map our delete listener
.flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
protected void loadData() {
RetrofitHelper.getBiliAppAPI()
.getRecommendedBannerInfo()
.compose(bindToLifecycle())
.map(RecommendBannerInfo::getData)
.flatMap(new Func1<List<RecommendBannerInfo.DataBean>, Observable<RecommendInfo>>() {
@Override
public Observable<RecommendInfo> call(List<RecommendBannerInfo.DataBean> dataBeans) {
recommendBanners.addAll(dataBeans);
return RetrofitHelper.getBiliAppAPI().getRecommendedInfo();
}
})
.compose(bindToLifecycle())
.map(RecommendInfo::getResult)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(resultBeans -> {
results.addAll(resultBeans);
finishTask();
}, throwable -> initEmptyView());
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
private void setUpEditText() {
RxTextView.textChanges(mSearchEdit)
.compose(this.bindToLifecycle())
.map(CharSequence::toString)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
.filter(integer -> !TextUtils.isEmpty(mSearchEdit.getText().toString().trim()))
.filter(integer -> integer == EditorInfo.IME_ACTION_SEARCH)
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
代码示例来源:origin: vert-x3/vertx-examples
@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
// Create two requests
HttpClientRequest req1 = client.request(HttpMethod.GET, 8080, "localhost", "/");
HttpClientRequest req2 = client.request(HttpMethod.GET, 8080, "localhost", "/");
// Turn the requests responses into Observable<JsonObject>
Observable<JsonObject> obs1 = req1.toObservable().flatMap(HttpClientResponse::toObservable).
map(buf -> new JsonObject(buf.toString("UTF-8")));
Observable<JsonObject> obs2 = req2.toObservable().flatMap(HttpClientResponse::toObservable).
map(buf -> new JsonObject(buf.toString("UTF-8")));
// Combine the responses with the zip into a single response
obs1.zipWith(obs2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).
subscribe(json -> {
System.out.println("Got combined result " + json);
},
err -> {
err.printStackTrace();
});
req1.end();
req2.end();
}
}
代码示例来源:origin: apache/usergrid
@Override
public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
.map( edge -> new EdgeScope(applicationScope, edge ));
} );
}
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
@Override
protected void loadData() {
RetrofitHelper.getBangumiAPI()
.getBangumiAppIndex()
.compose(bindToLifecycle())
.flatMap(new Func1<BangumiAppIndexInfo, Observable<BangumiRecommendInfo>>() {
@Override
public Observable<BangumiRecommendInfo> call(BangumiAppIndexInfo bangumiAppIndexInfo) {
banners.addAll(bangumiAppIndexInfo.getResult().getAd().getHead());
bangumibobys.addAll(bangumiAppIndexInfo.getResult().getAd().getBody());
seasonNewBangumis.addAll(bangumiAppIndexInfo.getResult().getPrevious().getList());
season = bangumiAppIndexInfo.getResult().getPrevious().getSeason();
newBangumiSerials.addAll(bangumiAppIndexInfo.getResult().getSerializing());
return RetrofitHelper.getBangumiAPI().getBangumiRecommended();
}
})
.compose(bindToLifecycle())
.map(BangumiRecommendInfo::getResult)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(resultBeans -> {
bangumiRecommends.addAll(resultBeans);
finishTask();
}, throwable -> initEmptyView());
}
代码示例来源:origin: PipelineAI/pipeline
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
}
final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
}
};
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
.share()
.onBackpressureDrop();
}
代码示例来源:origin: hidroh/materialistic
.flatMap(response -> response.code() != HttpURLConnection.HTTP_MOVED_TEMP ?
Observable.just(response) :
Observable.error(new IOException()))
.flatMap(response -> {
try {
return Observable.just(new String[]{
.map(array -> {
array[1] = getInputValue(array[1], SUBMIT_PARAM_FNID);
return array;
})
.flatMap(array -> !TextUtils.isEmpty(array[1]) ?
Observable.just(array) :
Observable.error(new IOException()))
代码示例来源:origin: Netflix/servo
return response.getContent()
.reduce(new ByteArrayOutputStream(), accumulator)
.map(ByteArrayOutputStream::toByteArray);
};
.flatMap(process)
.subscribeOn(Schedulers.io())
.toBlocking()
代码示例来源:origin: apache/usergrid
@Override
public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
final Id applicationId = applicationScope.getApplication();
//load all nodes that are targets of our application node. I.E.
// entities that have been saved
final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId );
//create our application node to emit since it's an entity as well
final Observable<Id> applicationNode = Observable.just( applicationId );
//merge both the specified application node and the entity node
// so they all get used
return Observable.merge( applicationNode, entityNodes ).
map( id -> new EntityIdScope( applicationScope, id ) );
} );
}
代码示例来源:origin: ReactiveX/RxNetty
@Override
public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() {
@Override
public HostHolder<W, R> call(HostConnector<W, R> connector) {
HostHolder<W, R> newHolder = strategy.toHolder(connector);
connector.subscribe(newHolder.getEventListener());
return newHolder;
}
}).flatMap(new Func1<HostHolder<W, R>, Observable<HostUpdate<W, R>>>() {
@Override
public Observable<HostUpdate<W, R>> call(HostHolder<W, R> holder) {
return holder.getConnector()
.getHost()
.getCloseNotifier()
.map(new VoidToAnythingCast<HostUpdate<W, R>>())
.ignoreElements()
.onErrorResumeNext(Observable.<HostUpdate<W, R>>empty())
.concatWith(Observable.just(new HostUpdate<>(Action.Remove, holder)))
.mergeWith(Observable.just(new HostUpdate<>(Action.Add, holder)));
}
}).flatMap(newCollector(collector.<W, R>newCollector()), 1).distinctUntilChanged());
}
代码示例来源:origin: apache/usergrid
@Override
public Observable<Edge> deleteEdge( final Edge edge ) {
GraphValidation.validateEdge( edge );
final UUID startTimestamp = UUIDGenerator.newTimeUUID();
final Observable<Edge> observable =
Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgeVersions( scope,
new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) );
}
} ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked ->
//fire our delete listener and wait for the results
edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext(
//log them
count -> logger.trace( "removed {} types for edge {} ", count, edge ) )
//return the marked edge
.map( count -> marked ) );
return ObservableTimer.time( observable, deleteEdgeTimer );
}
代码示例来源:origin: jooby-project/jooby
@Override
public <T> Observable<AsyncViewQueryResult<T>> query(final ViewQuery query) {
return bucket.query(query)
.map(result -> {
Observable<List<T>> rows = result.rows()
.flatMap(r -> r.document()
.map(doc -> {
EntityDocument<T> entity = converter.toEntity(doc, null);
return entity.content();
}))
.toList();
return new AsyncViewQueryResult(result.totalRows(), rows);
});
}
代码示例来源:origin: apache/usergrid
@Override
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
return filterValueObservable.flatMap( filterValue -> {
final String edgeTypeName = getEdgeName();
final Id id = filterValue.getValue();
//create our search
final SearchByEdge searchByEdge =
new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.absent() );
//load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
} );
}
代码示例来源:origin: apache/usergrid
@Override
public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
final EntityCollectionManager entityCollectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
//it's more efficient to make 1 network hop to get everything, then drop our results if required
final Observable<FilterResult<Entity>> entityObservable =
filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
if (logger.isTraceEnabled()) {
logger.trace("Attempting to batch load ids {}", bufferedIds);
}
final Observable<EntitySet> entitySetObservable =
Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
.flatMap( ids -> entityCollectionManager.load( ids ) );
//now we have a collection, validate our candidate set is correct.
GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
entitySet, bufferedIds, readRepairFig ) )
.doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
entityCollector -> Observable.from( entityCollector.getResults() ) );
} );
return entityObservable;
}
代码示例来源:origin: HotBitmapGG/bilibili-android-client
.getHDVideoUrl(cid, 4, ConstantUtil.VIDEO_TYPE_MP4)
.compose(bindToLifecycle())
.map(videoInfo -> Uri.parse(videoInfo.getDurl().get(0).getUrl()))
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Uri, Observable<BaseDanmakuParser>>() {
@Override
public Observable<BaseDanmakuParser> call(Uri uri) {
内容来源于网络,如有侵权,请联系作者删除!