rx.Observable.map()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(14.4k)|赞(0)|评价(0)|浏览(280)

本文整理了Java中rx.Observable.map()方法的一些代码示例,展示了Observable.map()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.map()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:map

Observable.map介绍

[英]Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.

Scheduler: map does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: hidroh/materialistic

  1. @Override
  2. public void parse(String itemId, String url, Callback callback) {
  3. Observable.defer(() -> fromCache(itemId))
  4. .subscribeOn(mIoScheduler)
  5. .flatMap(content -> content != null ?
  6. Observable.just(content) : fromNetwork(itemId, url))
  7. .map(content -> AndroidUtils.TextUtils.equals(EMPTY_CONTENT, content) ? null : content)
  8. .observeOn(mMainThreadScheduler)
  9. .subscribe(callback::onResponse);
  10. }

代码示例来源:origin: apache/usergrid

  1. public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
  2. WriteStart writeState ) {
  3. return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
  4. Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
  5. Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
  6. .doOnNext( writeVerifyUnique );
  7. // optimistic verification
  8. Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
  9. Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
  10. .doOnNext( writeOptimisticVerify );
  11. final Observable<CollectionIoEvent<MvccEntity>> zip =
  12. Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
  13. return zip;
  14. } );
  15. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. .getLiveUrl(cid)
  2. .compose(this.bindToLifecycle())
  3. .map(responseBody -> {
  4. try {
  5. String str = responseBody.string();
  6. .flatMap(new Func1<String, Observable<Long>>() {
  7. @Override
  8. public Observable<Long> call(String s) {

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<MarkedEdge> compactNode( final Id inputNode ) {
  3. final UUID startTime = UUIDGenerator.newTimeUUID();
  4. final Observable<MarkedEdge> nodeObservable =
  5. Observable.just( inputNode )
  6. .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
  7. //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
  8. .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
  9. //map our delete listener
  10. .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
  11. return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
  12. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. @Override
  2. protected void loadData() {
  3. RetrofitHelper.getBiliAppAPI()
  4. .getRecommendedBannerInfo()
  5. .compose(bindToLifecycle())
  6. .map(RecommendBannerInfo::getData)
  7. .flatMap(new Func1<List<RecommendBannerInfo.DataBean>, Observable<RecommendInfo>>() {
  8. @Override
  9. public Observable<RecommendInfo> call(List<RecommendBannerInfo.DataBean> dataBeans) {
  10. recommendBanners.addAll(dataBeans);
  11. return RetrofitHelper.getBiliAppAPI().getRecommendedInfo();
  12. }
  13. })
  14. .compose(bindToLifecycle())
  15. .map(RecommendInfo::getResult)
  16. .subscribeOn(Schedulers.io())
  17. .observeOn(AndroidSchedulers.mainThread())
  18. .subscribe(resultBeans -> {
  19. results.addAll(resultBeans);
  20. finishTask();
  21. }, throwable -> initEmptyView());
  22. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. private void setUpEditText() {
  2. RxTextView.textChanges(mSearchEdit)
  3. .compose(this.bindToLifecycle())
  4. .map(CharSequence::toString)
  5. .observeOn(AndroidSchedulers.mainThread())
  6. .subscribe(s -> {
  7. .filter(integer -> !TextUtils.isEmpty(mSearchEdit.getText().toString().trim()))
  8. .filter(integer -> integer == EditorInfo.IME_ACTION_SEARCH)
  9. .flatMap(new Func1<Integer, Observable<String>>() {
  10. @Override
  11. public Observable<String> call(Integer integer) {

代码示例来源:origin: vert-x3/vertx-examples

  1. @Override
  2. public void start() throws Exception {
  3. HttpClient client = vertx.createHttpClient();
  4. // Create two requests
  5. HttpClientRequest req1 = client.request(HttpMethod.GET, 8080, "localhost", "/");
  6. HttpClientRequest req2 = client.request(HttpMethod.GET, 8080, "localhost", "/");
  7. // Turn the requests responses into Observable<JsonObject>
  8. Observable<JsonObject> obs1 = req1.toObservable().flatMap(HttpClientResponse::toObservable).
  9. map(buf -> new JsonObject(buf.toString("UTF-8")));
  10. Observable<JsonObject> obs2 = req2.toObservable().flatMap(HttpClientResponse::toObservable).
  11. map(buf -> new JsonObject(buf.toString("UTF-8")));
  12. // Combine the responses with the zip into a single response
  13. obs1.zipWith(obs2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).
  14. subscribe(json -> {
  15. System.out.println("Got combined result " + json);
  16. },
  17. err -> {
  18. err.printStackTrace();
  19. });
  20. req1.end();
  21. req2.end();
  22. }
  23. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
  3. return appScopes.flatMap( applicationScope -> {
  4. final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  5. return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
  6. .map( edge -> new EdgeScope(applicationScope, edge ));
  7. } );
  8. }
  9. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. @Override
  2. protected void loadData() {
  3. RetrofitHelper.getBangumiAPI()
  4. .getBangumiAppIndex()
  5. .compose(bindToLifecycle())
  6. .flatMap(new Func1<BangumiAppIndexInfo, Observable<BangumiRecommendInfo>>() {
  7. @Override
  8. public Observable<BangumiRecommendInfo> call(BangumiAppIndexInfo bangumiAppIndexInfo) {
  9. banners.addAll(bangumiAppIndexInfo.getResult().getAd().getHead());
  10. bangumibobys.addAll(bangumiAppIndexInfo.getResult().getAd().getBody());
  11. seasonNewBangumis.addAll(bangumiAppIndexInfo.getResult().getPrevious().getList());
  12. season = bangumiAppIndexInfo.getResult().getPrevious().getSeason();
  13. newBangumiSerials.addAll(bangumiAppIndexInfo.getResult().getSerializing());
  14. return RetrofitHelper.getBangumiAPI().getBangumiRecommended();
  15. }
  16. })
  17. .compose(bindToLifecycle())
  18. .map(BangumiRecommendInfo::getResult)
  19. .subscribeOn(Schedulers.io())
  20. .observeOn(AndroidSchedulers.mainThread())
  21. .subscribe(resultBeans -> {
  22. bangumiRecommends.addAll(resultBeans);
  23. finishTask();
  24. }, throwable -> initEmptyView());
  25. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: hidroh/materialistic

  1. .flatMap(response -> response.code() != HttpURLConnection.HTTP_MOVED_TEMP ?
  2. Observable.just(response) :
  3. Observable.error(new IOException()))
  4. .flatMap(response -> {
  5. try {
  6. return Observable.just(new String[]{
  7. .map(array -> {
  8. array[1] = getInputValue(array[1], SUBMIT_PARAM_FNID);
  9. return array;
  10. })
  11. .flatMap(array -> !TextUtils.isEmpty(array[1]) ?
  12. Observable.just(array) :
  13. Observable.error(new IOException()))

代码示例来源:origin: Netflix/servo

  1. return response.getContent()
  2. .reduce(new ByteArrayOutputStream(), accumulator)
  3. .map(ByteArrayOutputStream::toByteArray);
  4. };
  5. .flatMap(process)
  6. .subscribeOn(Schedulers.io())
  7. .toBlocking()

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) {
  3. return appScopes.flatMap( applicationScope -> {
  4. final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
  5. final Id applicationId = applicationScope.getApplication();
  6. //load all nodes that are targets of our application node. I.E.
  7. // entities that have been saved
  8. final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId );
  9. //create our application node to emit since it's an entity as well
  10. final Observable<Id> applicationNode = Observable.just( applicationId );
  11. //merge both the specified application node and the entity node
  12. // so they all get used
  13. return Observable.merge( applicationNode, entityNodes ).
  14. map( id -> new EntityIdScope( applicationScope, id ) );
  15. } );
  16. }

代码示例来源:origin: ReactiveX/RxNetty

  1. @Override
  2. public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
  3. return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() {
  4. @Override
  5. public HostHolder<W, R> call(HostConnector<W, R> connector) {
  6. HostHolder<W, R> newHolder = strategy.toHolder(connector);
  7. connector.subscribe(newHolder.getEventListener());
  8. return newHolder;
  9. }
  10. }).flatMap(new Func1<HostHolder<W, R>, Observable<HostUpdate<W, R>>>() {
  11. @Override
  12. public Observable<HostUpdate<W, R>> call(HostHolder<W, R> holder) {
  13. return holder.getConnector()
  14. .getHost()
  15. .getCloseNotifier()
  16. .map(new VoidToAnythingCast<HostUpdate<W, R>>())
  17. .ignoreElements()
  18. .onErrorResumeNext(Observable.<HostUpdate<W, R>>empty())
  19. .concatWith(Observable.just(new HostUpdate<>(Action.Remove, holder)))
  20. .mergeWith(Observable.just(new HostUpdate<>(Action.Add, holder)));
  21. }
  22. }).flatMap(newCollector(collector.<W, R>newCollector()), 1).distinctUntilChanged());
  23. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<Edge> deleteEdge( final Edge edge ) {
  3. GraphValidation.validateEdge( edge );
  4. final UUID startTimestamp = UUIDGenerator.newTimeUUID();
  5. final Observable<Edge> observable =
  6. Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) {
  7. @Override
  8. protected Iterator<MarkedEdge> getIterator() {
  9. return storageEdgeSerialization.getEdgeVersions( scope,
  10. new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
  11. Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) );
  12. }
  13. } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked ->
  14. //fire our delete listener and wait for the results
  15. edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext(
  16. //log them
  17. count -> logger.trace( "removed {} types for edge {} ", count, edge ) )
  18. //return the marked edge
  19. .map( count -> marked ) );
  20. return ObservableTimer.time( observable, deleteEdgeTimer );
  21. }

代码示例来源:origin: jooby-project/jooby

  1. @Override
  2. public <T> Observable<AsyncViewQueryResult<T>> query(final ViewQuery query) {
  3. return bucket.query(query)
  4. .map(result -> {
  5. Observable<List<T>> rows = result.rows()
  6. .flatMap(r -> r.document()
  7. .map(doc -> {
  8. EntityDocument<T> entity = converter.toEntity(doc, null);
  9. return entity.content();
  10. }))
  11. .toList();
  12. return new AsyncViewQueryResult(result.totalRows(), rows);
  13. });
  14. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
  3. final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
  4. return filterValueObservable.flatMap( filterValue -> {
  5. final String edgeTypeName = getEdgeName();
  6. final Id id = filterValue.getValue();
  7. //create our search
  8. final SearchByEdge searchByEdge =
  9. new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
  10. Optional.absent() );
  11. //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
  12. return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
  13. } );
  14. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
  3. final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  4. final EntityCollectionManager entityCollectionManager =
  5. entityCollectionManagerFactory.createCollectionManager( applicationScope );
  6. //it's more efficient to make 1 network hop to get everything, then drop our results if required
  7. final Observable<FilterResult<Entity>> entityObservable =
  8. filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
  9. if (logger.isTraceEnabled()) {
  10. logger.trace("Attempting to batch load ids {}", bufferedIds);
  11. }
  12. final Observable<EntitySet> entitySetObservable =
  13. Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
  14. .flatMap( ids -> entityCollectionManager.load( ids ) );
  15. //now we have a collection, validate our candidate set is correct.
  16. GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
  17. return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
  18. entitySet, bufferedIds, readRepairFig ) )
  19. .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
  20. entityCollector -> Observable.from( entityCollector.getResults() ) );
  21. } );
  22. return entityObservable;
  23. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. .getHDVideoUrl(cid, 4, ConstantUtil.VIDEO_TYPE_MP4)
  2. .compose(bindToLifecycle())
  3. .map(videoInfo -> Uri.parse(videoInfo.getDurl().get(0).getUrl()))
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .flatMap(new Func1<Uri, Observable<BaseDanmakuParser>>() {
  6. @Override
  7. public Observable<BaseDanmakuParser> call(Uri uri) {

相关文章

Observable类方法