rx.Observable.map()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(14.4k)|赞(0)|评价(0)|浏览(249)

本文整理了Java中rx.Observable.map()方法的一些代码示例,展示了Observable.map()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.map()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:map

Observable.map介绍

[英]Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.

Scheduler: map does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: hidroh/materialistic

@Override
public void parse(String itemId, String url, Callback callback) {
  Observable.defer(() -> fromCache(itemId))
      .subscribeOn(mIoScheduler)
      .flatMap(content -> content != null ?
          Observable.just(content) : fromNetwork(itemId, url))
      .map(content -> AndroidUtils.TextUtils.equals(EMPTY_CONTENT, content) ? null : content)
      .observeOn(mMainThreadScheduler)
      .subscribe(callback::onResponse);
}

代码示例来源:origin: apache/usergrid

public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                               WriteStart writeState ) {
  return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
    Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
      Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
        .doOnNext( writeVerifyUnique );
    // optimistic verification
    Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
      Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
        .doOnNext( writeOptimisticVerify );
    final Observable<CollectionIoEvent<MvccEntity>> zip =
      Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
    return zip;
  } );
}

代码示例来源:origin: PipelineAI/pipeline

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

.getLiveUrl(cid)
.compose(this.bindToLifecycle())
.map(responseBody -> {
  try {
    String str = responseBody.string();
.flatMap(new Func1<String, Observable<Long>>() {
  @Override
  public Observable<Long> call(String s) {

代码示例来源:origin: apache/usergrid

@Override
public Observable<MarkedEdge> compactNode( final Id inputNode ) {
  final UUID startTime = UUIDGenerator.newTimeUUID();
  final Observable<MarkedEdge> nodeObservable =
    Observable.just( inputNode )
      .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
      //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
      .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
      //map our delete listener
      .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
  return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getBiliAppAPI()
      .getRecommendedBannerInfo()
      .compose(bindToLifecycle())
      .map(RecommendBannerInfo::getData)
      .flatMap(new Func1<List<RecommendBannerInfo.DataBean>, Observable<RecommendInfo>>() {
        @Override
        public Observable<RecommendInfo> call(List<RecommendBannerInfo.DataBean> dataBeans) {
          recommendBanners.addAll(dataBeans);
          return RetrofitHelper.getBiliAppAPI().getRecommendedInfo();
        }
      })
      .compose(bindToLifecycle())
      .map(RecommendInfo::getResult)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(resultBeans -> {
        results.addAll(resultBeans);
        finishTask();
      }, throwable -> initEmptyView());
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

private void setUpEditText() {
  RxTextView.textChanges(mSearchEdit)
      .compose(this.bindToLifecycle())
      .map(CharSequence::toString)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(s -> {
      .filter(integer -> !TextUtils.isEmpty(mSearchEdit.getText().toString().trim()))
      .filter(integer -> integer == EditorInfo.IME_ACTION_SEARCH)
      .flatMap(new Func1<Integer, Observable<String>>() {
        @Override
        public Observable<String> call(Integer integer) {

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {
  HttpClient client = vertx.createHttpClient();

  // Create two requests
  HttpClientRequest req1 = client.request(HttpMethod.GET, 8080, "localhost", "/");
  HttpClientRequest req2 = client.request(HttpMethod.GET, 8080, "localhost", "/");

  // Turn the requests responses into Observable<JsonObject>
  Observable<JsonObject> obs1 = req1.toObservable().flatMap(HttpClientResponse::toObservable).
    map(buf -> new JsonObject(buf.toString("UTF-8")));
  Observable<JsonObject> obs2 = req2.toObservable().flatMap(HttpClientResponse::toObservable).
    map(buf -> new JsonObject(buf.toString("UTF-8")));

  // Combine the responses with the zip into a single response
  obs1.zipWith(obs2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).
    subscribe(json -> {
       System.out.println("Got combined result " + json);
      },
      err -> {
       err.printStackTrace();
      });

  req1.end();
  req2.end();
 }
}

代码示例来源:origin: apache/usergrid

@Override
  public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {

    return appScopes.flatMap( applicationScope -> {
      final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );

      return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
                 .map( edge -> new EdgeScope(applicationScope, edge ));
    } );
  }
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

@Override
protected void loadData() {
  RetrofitHelper.getBangumiAPI()
      .getBangumiAppIndex()
      .compose(bindToLifecycle())
      .flatMap(new Func1<BangumiAppIndexInfo, Observable<BangumiRecommendInfo>>() {
        @Override
        public Observable<BangumiRecommendInfo> call(BangumiAppIndexInfo bangumiAppIndexInfo) {
          banners.addAll(bangumiAppIndexInfo.getResult().getAd().getHead());
          bangumibobys.addAll(bangumiAppIndexInfo.getResult().getAd().getBody());
          seasonNewBangumis.addAll(bangumiAppIndexInfo.getResult().getPrevious().getList());
          season = bangumiAppIndexInfo.getResult().getPrevious().getSeason();
          newBangumiSerials.addAll(bangumiAppIndexInfo.getResult().getSerializing());
          return RetrofitHelper.getBangumiAPI().getBangumiRecommended();
        }
      })
      .compose(bindToLifecycle())
      .map(BangumiRecommendInfo::getResult)
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(resultBeans -> {
        bangumiRecommends.addAll(resultBeans);
        finishTask();
      }, throwable -> initEmptyView());
}

代码示例来源:origin: PipelineAI/pipeline

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: hidroh/materialistic

.flatMap(response -> response.code() != HttpURLConnection.HTTP_MOVED_TEMP ?
    Observable.just(response) :
    Observable.error(new IOException()))
.flatMap(response -> {
  try {
    return Observable.just(new String[]{
.map(array -> {
  array[1] = getInputValue(array[1], SUBMIT_PARAM_FNID);
  return array;
})
.flatMap(array -> !TextUtils.isEmpty(array[1]) ?
    Observable.just(array) :
    Observable.error(new IOException()))

代码示例来源:origin: Netflix/servo

return response.getContent()
   .reduce(new ByteArrayOutputStream(), accumulator)
   .map(ByteArrayOutputStream::toByteArray);
};
  .flatMap(process)
  .subscribeOn(Schedulers.io())
  .toBlocking()

代码示例来源:origin: apache/usergrid

@Override
public Observable<EntityIdScope> getEntities( final Observable<ApplicationScope> appScopes ) {
  return appScopes.flatMap( applicationScope -> {
    final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
    final Id applicationId = applicationScope.getApplication();
    //load all nodes that are targets of our application node.  I.E.
    // entities that have been saved
    final Observable<Id> entityNodes = targetIdObservable.getTargetNodes( gm, applicationId );
    //create our application node to emit since it's an entity as well
    final Observable<Id> applicationNode = Observable.just( applicationId );
    //merge both the specified application node and the entity node
    // so they all get used
    return Observable.merge( applicationNode, entityNodes ).
      map( id -> new EntityIdScope( applicationScope, id ) );
  } );
}

代码示例来源:origin: ReactiveX/RxNetty

@Override
public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> hosts) {
  return new ConnectionProviderImpl(hosts.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() {
    @Override
    public HostHolder<W, R> call(HostConnector<W, R> connector) {
      HostHolder<W, R> newHolder = strategy.toHolder(connector);
      connector.subscribe(newHolder.getEventListener());
      return newHolder;
    }
  }).flatMap(new Func1<HostHolder<W, R>, Observable<HostUpdate<W, R>>>() {
    @Override
    public Observable<HostUpdate<W, R>> call(HostHolder<W, R> holder) {
      return holder.getConnector()
             .getHost()
             .getCloseNotifier()
             .map(new VoidToAnythingCast<HostUpdate<W, R>>())
             .ignoreElements()
             .onErrorResumeNext(Observable.<HostUpdate<W, R>>empty())
             .concatWith(Observable.just(new HostUpdate<>(Action.Remove, holder)))
             .mergeWith(Observable.just(new HostUpdate<>(Action.Add, holder)));
    }
  }).flatMap(newCollector(collector.<W, R>newCollector()), 1).distinctUntilChanged());
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<Edge> deleteEdge( final Edge edge ) {
  GraphValidation.validateEdge( edge );
  final UUID startTimestamp = UUIDGenerator.newTimeUUID();
  final Observable<Edge> observable =
    Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) {
      @Override
      protected Iterator<MarkedEdge> getIterator() {
        return storageEdgeSerialization.getEdgeVersions( scope,
          new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
            Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) );
      }
    } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked ->
      //fire our delete listener and wait for the results
      edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext(
        //log them
        count -> logger.trace( "removed {} types for edge {} ", count, edge ) )
        //return the marked edge
        .map( count -> marked ) );
  return ObservableTimer.time( observable, deleteEdgeTimer );
}

代码示例来源:origin: jooby-project/jooby

@Override
public <T> Observable<AsyncViewQueryResult<T>> query(final ViewQuery query) {
 return bucket.query(query)
   .map(result -> {
    Observable<List<T>> rows = result.rows()
      .flatMap(r -> r.document()
        .map(doc -> {
         EntityDocument<T> entity = converter.toEntity(doc, null);
         return entity.content();
        }))
      .toList();
    return new AsyncViewQueryResult(result.totalRows(), rows);
   });
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
  final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
  return filterValueObservable.flatMap( filterValue -> {
    final String edgeTypeName = getEdgeName();
    final Id id = filterValue.getValue();
    //create our search
    final SearchByEdge searchByEdge =
      new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
        Optional.absent() );
    //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
    return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
  } );
}

代码示例来源:origin: apache/usergrid

@Override
public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
  final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
  final EntityCollectionManager entityCollectionManager =
    entityCollectionManagerFactory.createCollectionManager( applicationScope );
  //it's more efficient to make 1 network hop to get everything, then drop our results if required
  final Observable<FilterResult<Entity>> entityObservable =
    filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
      if (logger.isTraceEnabled()) {
        logger.trace("Attempting to batch load ids {}", bufferedIds);
      }
      final Observable<EntitySet> entitySetObservable =
        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
             .flatMap( ids -> entityCollectionManager.load( ids ) );
      //now we have a collection, validate our candidate set is correct.
      GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
      return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager,
        entitySet, bufferedIds, readRepairFig ) )
                   .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
          entityCollector -> Observable.from( entityCollector.getResults() ) );
    } );
  return entityObservable;
}

代码示例来源:origin: HotBitmapGG/bilibili-android-client

.getHDVideoUrl(cid, 4, ConstantUtil.VIDEO_TYPE_MP4)
.compose(bindToLifecycle())
.map(videoInfo -> Uri.parse(videoInfo.getDurl().get(0).getUrl()))
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Uri, Observable<BaseDanmakuParser>>() {
  @Override
  public Observable<BaseDanmakuParser> call(Uri uri) {

相关文章

Observable类方法