rx.Observable.concatMap()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(162)

本文整理了Java中rx.Observable.concatMap()方法的一些代码示例,展示了Observable.concatMap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMap()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:concatMap

Observable.concatMap介绍

[英]Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatinating those resulting Observables.

Scheduler: concatMap does not operate by default on a particular Scheduler.
[中]返回一个新的Observable,该Observable将向源Observable发出的每个项应用一个函数,该函数将返回一个Observable,然后发送通过将这些结果Observable合并而产生的项。
调度器:默认情况下,concatMap不会在特定的调度器上运行。

代码示例

代码示例来源:origin: konmik/nucleus

@Override
public void onCreate(Bundle savedState) {
  super.onCreate(savedState);
  restartableReplay(REQUEST_ITEMS,
    () -> pageRequests.startWith(0)
      .concatMap(page ->
        api.getItems(name.split("\\s+")[0], name.split("\\s+")[1], page)
          .map(data -> new PageBundle<>(page, data))
          .delay(pref.getInt("delay", 0), TimeUnit.SECONDS)
          .observeOn(mainThread())),
    (activity, page) -> activity.onItems(page, name),
    MainFragment::onNetworkError);
}

代码示例来源:origin: davidmoten/rxjava-jdbc

private static <T> Observable<T> beginTransactionOnNext(final Database db,
    Observable<T> source) {
  return source.concatMap(new Func1<T, Observable<T>>() {
    @Override
    public Observable<T> call(T t) {
      return db.beginTransaction().map(Functions.constant(t));
    }
  });
}

代码示例来源:origin: davidmoten/rxjava-jdbc

@Override
  public Observable<Boolean> call(Observable<Observable<T>> source) {
    return source.concatMap(new Func1<Observable<T>, Observable<Boolean>>() {
      @Override
      public Observable<Boolean> call(Observable<T> source) {
        if (isCommit)
          return commit(source);
        else
          return rollback(source);
      }
    });
  }
};

代码示例来源:origin: davidmoten/rxjava-jdbc

/**
 * Emits true for commit and false for rollback.
 * 
 * @param isCommit
 * @param db
 * @param source
 * @return
 */
private static <T> Observable<Boolean> commitOrRollbackOnNext(final boolean isCommit,
    final Database db, Observable<T> source) {
  return source.concatMap(new Func1<T, Observable<Boolean>>() {
    @Override
    public Observable<Boolean> call(T t) {
      if (isCommit)
        return db.commit();
      else
        return db.rollback();
    }
  });
}

代码示例来源:origin: ReactiveX/RxNetty

.doOnError(LogErrorAction.INSTANCE)
.concatMap(new IdleConnectionCleanupTask())
.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
  @Override

代码示例来源:origin: jhusain/learnrxjava

/**
 * Flatten out all video in the stream of Movies into a stream of videoIDs
 * 
 * @param movieLists
 * @return Observable of Integers of Movies.videos.id
 */
public Observable<Integer> exerciseConcatMap(Observable<Movies> movies) {
  return movies.<Integer> concatMap(ml -> {
    return ml.videos.map(v -> v.id);
  });
}

代码示例来源:origin: ribot/ribot-app-android

/**
 * Sign in with a Google account.
 * 1. Retrieve an google auth code for the given account
 * 2. Sends code and account to API
 * 3. If success, saves ribot profile and API access token in preferences
 */
public Observable<Ribot> signIn(Account account) {
  return mGoogleAuthHelper.retrieveAuthTokenAsObservable(account)
      .concatMap(new Func1<String, Observable<SignInResponse>>() {
        @Override
        public Observable<SignInResponse> call(String googleAccessToken) {
          return mRibotService.signIn(new SignInRequest(googleAccessToken));
        }
      })
      .map(new Func1<SignInResponse, Ribot>() {
        @Override
        public Ribot call(SignInResponse signInResponse) {
          mPreferencesHelper.putAccessToken(signInResponse.accessToken);
          mPreferencesHelper.putSignedInRibot(signInResponse.ribot);
          return signInResponse.ribot;
        }
      });
}

代码示例来源:origin: davidmoten/rxjava-jdbc

/**
 * Returns the results of running a select query with all sets of
 * parameters.
 * 
 * @return
 */
public <T> Observable<T> execute(ResultSetMapper<? extends T> function) {
  return bufferedParameters(this)
      // execute once per set of parameters
      .concatMap(executeOnce(function));
}

代码示例来源:origin: konmik/nucleus

.concatMap(new Func1<Integer, Observable<String>>() {
  @Override
  public Observable<String> call(final Integer page) {

代码示例来源:origin: ribot/ribot-app-android

public Observable<Encounter> performBeaconEncounter(String uuid, int major, int minor) {
  Observable<RegisteredBeacon> errorObservable = Observable.error(
      new BeaconNotRegisteredException(uuid, major, minor));
  return mDatabaseHelper.findRegisteredBeacon(uuid, major, minor)
      .switchIfEmpty(errorObservable)
      .concatMap(new Func1<RegisteredBeacon, Observable<Encounter>>() {
        @Override
        public Observable<Encounter> call(RegisteredBeacon registeredBeacon) {
          return performBeaconEncounter(registeredBeacon.id);
        }
      });
}

代码示例来源:origin: davidmoten/rxjava-jdbc

/**
 * If the number of parameters in a query is >0 then group the parameters in
 * lists of that number in size but only after the dependencies have been
 * completed. If the number of parameteres is zero then return an observable
 * containing one item being an empty list.
 * 
 * @param query
 * @return
 */
static Observable<List<Parameter>> bufferedParameters(Query query) {
  int numParamsPerQuery = numParamsPerQuery(query);
  if (numParamsPerQuery > 0)
    // we don't check that parameters is empty after this because by
    // general design we want nothing to happen if a query is passed no
    // parameters when it expects them
    return parametersAfterDependencies(query).concatMap(FLATTEN_NAMED_MAPS)
        .buffer(numParamsPerQuery);
  else
    return singleIntegerAfterDependencies(query).map(TO_EMPTY_PARAMETER_LIST);
}

代码示例来源:origin: davidmoten/rxjava-jdbc

static <T> Observable<T> get(QueryUpdate<T> queryUpdate) {
  if (queryUpdate.context().batchSize() > 1) {
    return bufferedParameters(queryUpdate) //
        // mark the last parameter list as such
        .compose(Transformers.mapLast(toFinalArrayList))//
        // execute query for each set of parameters
        .concatMap(queryUpdate.executeOnce());
  } else {
    return bufferedParameters(queryUpdate) //
        // execute query for each set of parameters
        .concatMap(queryUpdate.executeOnce());
  }
}

代码示例来源:origin: davidmoten/rxjava-jdbc

@Override
public Observable<T> call(Observable<R> source) {
  if (operatorType == OperatorType.PARAMETER)
    return builder.parameters(source).get(function);
  else if (operatorType == OperatorType.DEPENDENCY)
    // dependency
    return builder.dependsOn(source).get(function);
  else // PARAMETER_LIST
  {
    @SuppressWarnings("unchecked")
    Observable<Observable<Object>> obs = (Observable<Observable<Object>>) source;
    return obs.concatMap(new Func1<Observable<Object>, Observable<T>>() {
      @Override
      public Observable<T> call(Observable<Object> parameters) {
        return builder.parameters(parameters).get(function);
      }
    });
  }
}

代码示例来源:origin: ribot/ribot-app-android

public Observable<Void> syncRegisteredBeacons() {
  String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
  return mRibotService.getRegisteredBeacons(auth)
      .concatMap(new Func1<List<RegisteredBeacon>, Observable<Void>>() {
        @Override
        public Observable<Void> call(List<RegisteredBeacon> beacons) {
          return mDatabaseHelper.setRegisteredBeacons(beacons);
        }
      })
      .doOnCompleted(postEventSafelyAction(new BusEvent.BeaconsSyncCompleted()));
}

代码示例来源:origin: com.netflix.hystrix/hystrix-metrics-event-stream

/* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
  super(sampleStream.concatMap(new Func1<HystrixDashboardStream.DashboardData, Observable<String>>() {
    @Override
    public Observable<String> call(HystrixDashboardStream.DashboardData dashboardData) {
      return Observable.from(SerialHystrixDashboardData.toMultipleJsonStrings(dashboardData));
    }
  }), pausePollerThreadDelayInMs);
}

代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service

@Override
public <T extends Number> Observable<NamedDataPoint<Double>> findRateData(List<MetricId<T>> ids, long start,
                                 long end, int limit, Order order) {
  return Observable.from(ids).concatMap(id -> findRateData(id, start, end, limit, order)
      .map(dataPoint -> new NamedDataPoint<>(id.getName(), dataPoint)));
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public <T extends Number> Observable<NamedDataPoint<Double>> findRateData(List<MetricId<T>> ids, long start,
                                 long end, int limit, Order order) {
  return Observable.from(ids).concatMap(id -> findRateData(id, start, end, limit, order)
      .map(dataPoint -> new NamedDataPoint<>(id.getName(), dataPoint)));
}

代码示例来源:origin: hawkular/hawkular-metrics

@Override
public <T> Observable<NamedDataPoint<T>> findDataPoints(String tenantId, MetricType<T> metricType,
    String tagFilters, long start, long end, int limit, Order order) {
  return findMetricIdentifiersWithFilters(tenantId, metricType, tagFilters)
      .concatMap(id -> findDataPoints(id, start, end, limit, order)
          .map(dataPoint -> new NamedDataPoint<>(id.getName(), dataPoint)));
}

代码示例来源:origin: com.trunk.rx.json/rxjava-json-core

protected JsonArray(Observable<T> elements) {
 super(
  Observable.<JsonToken>just(JsonArrayStart.instance())
   .concatWith(
    elements
     .concatMap(jsonElement -> Observable.<JsonToken>just(JsonComma.instance()).concatWith(jsonElement))
     .skip(1)
   )
   .concatWith(Observable.just(JsonArrayEnd.instance()))
 );
 this.elements = elements;
}

代码示例来源:origin: VictorAlbertos/RxGcm

public Observable<Boolean> sendGcmNotificationRequestingSupply(String title, String body) {
  return RxGcm.Notifications.currentToken()
      .map(token -> new Payload(token, title, body, TARGET_SUPPLY_GCM))
      .concatMap(payload -> apiGcmServer.sendNotification(payload))
      .map(gcmResponseServerResponse -> gcmResponseServerResponse.body().success())
      .onErrorReturn(throwable -> false);
}

相关文章

Observable类方法