本文整理了Java中rx.Observable.concatMap()
方法的一些代码示例,展示了Observable.concatMap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.concatMap()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:concatMap
[英]Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatinating those resulting Observables.
Scheduler: concatMap does not operate by default on a particular Scheduler.
[中]返回一个新的Observable,该Observable将向源Observable发出的每个项应用一个函数,该函数将返回一个Observable,然后发送通过将这些结果Observable合并而产生的项。
调度器:默认情况下,concatMap不会在特定的调度器上运行。
代码示例来源:origin: konmik/nucleus
@Override
public void onCreate(Bundle savedState) {
super.onCreate(savedState);
restartableReplay(REQUEST_ITEMS,
() -> pageRequests.startWith(0)
.concatMap(page ->
api.getItems(name.split("\\s+")[0], name.split("\\s+")[1], page)
.map(data -> new PageBundle<>(page, data))
.delay(pref.getInt("delay", 0), TimeUnit.SECONDS)
.observeOn(mainThread())),
(activity, page) -> activity.onItems(page, name),
MainFragment::onNetworkError);
}
代码示例来源:origin: davidmoten/rxjava-jdbc
private static <T> Observable<T> beginTransactionOnNext(final Database db,
Observable<T> source) {
return source.concatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
return db.beginTransaction().map(Functions.constant(t));
}
});
}
代码示例来源:origin: davidmoten/rxjava-jdbc
@Override
public Observable<Boolean> call(Observable<Observable<T>> source) {
return source.concatMap(new Func1<Observable<T>, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Observable<T> source) {
if (isCommit)
return commit(source);
else
return rollback(source);
}
});
}
};
代码示例来源:origin: davidmoten/rxjava-jdbc
/**
* Emits true for commit and false for rollback.
*
* @param isCommit
* @param db
* @param source
* @return
*/
private static <T> Observable<Boolean> commitOrRollbackOnNext(final boolean isCommit,
final Database db, Observable<T> source) {
return source.concatMap(new Func1<T, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(T t) {
if (isCommit)
return db.commit();
else
return db.rollback();
}
});
}
代码示例来源:origin: ReactiveX/RxNetty
.doOnError(LogErrorAction.INSTANCE)
.concatMap(new IdleConnectionCleanupTask())
.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() {
@Override
代码示例来源:origin: jhusain/learnrxjava
/**
* Flatten out all video in the stream of Movies into a stream of videoIDs
*
* @param movieLists
* @return Observable of Integers of Movies.videos.id
*/
public Observable<Integer> exerciseConcatMap(Observable<Movies> movies) {
return movies.<Integer> concatMap(ml -> {
return ml.videos.map(v -> v.id);
});
}
代码示例来源:origin: ribot/ribot-app-android
/**
* Sign in with a Google account.
* 1. Retrieve an google auth code for the given account
* 2. Sends code and account to API
* 3. If success, saves ribot profile and API access token in preferences
*/
public Observable<Ribot> signIn(Account account) {
return mGoogleAuthHelper.retrieveAuthTokenAsObservable(account)
.concatMap(new Func1<String, Observable<SignInResponse>>() {
@Override
public Observable<SignInResponse> call(String googleAccessToken) {
return mRibotService.signIn(new SignInRequest(googleAccessToken));
}
})
.map(new Func1<SignInResponse, Ribot>() {
@Override
public Ribot call(SignInResponse signInResponse) {
mPreferencesHelper.putAccessToken(signInResponse.accessToken);
mPreferencesHelper.putSignedInRibot(signInResponse.ribot);
return signInResponse.ribot;
}
});
}
代码示例来源:origin: davidmoten/rxjava-jdbc
/**
* Returns the results of running a select query with all sets of
* parameters.
*
* @return
*/
public <T> Observable<T> execute(ResultSetMapper<? extends T> function) {
return bufferedParameters(this)
// execute once per set of parameters
.concatMap(executeOnce(function));
}
代码示例来源:origin: konmik/nucleus
.concatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer page) {
代码示例来源:origin: ribot/ribot-app-android
public Observable<Encounter> performBeaconEncounter(String uuid, int major, int minor) {
Observable<RegisteredBeacon> errorObservable = Observable.error(
new BeaconNotRegisteredException(uuid, major, minor));
return mDatabaseHelper.findRegisteredBeacon(uuid, major, minor)
.switchIfEmpty(errorObservable)
.concatMap(new Func1<RegisteredBeacon, Observable<Encounter>>() {
@Override
public Observable<Encounter> call(RegisteredBeacon registeredBeacon) {
return performBeaconEncounter(registeredBeacon.id);
}
});
}
代码示例来源:origin: davidmoten/rxjava-jdbc
/**
* If the number of parameters in a query is >0 then group the parameters in
* lists of that number in size but only after the dependencies have been
* completed. If the number of parameteres is zero then return an observable
* containing one item being an empty list.
*
* @param query
* @return
*/
static Observable<List<Parameter>> bufferedParameters(Query query) {
int numParamsPerQuery = numParamsPerQuery(query);
if (numParamsPerQuery > 0)
// we don't check that parameters is empty after this because by
// general design we want nothing to happen if a query is passed no
// parameters when it expects them
return parametersAfterDependencies(query).concatMap(FLATTEN_NAMED_MAPS)
.buffer(numParamsPerQuery);
else
return singleIntegerAfterDependencies(query).map(TO_EMPTY_PARAMETER_LIST);
}
代码示例来源:origin: davidmoten/rxjava-jdbc
static <T> Observable<T> get(QueryUpdate<T> queryUpdate) {
if (queryUpdate.context().batchSize() > 1) {
return bufferedParameters(queryUpdate) //
// mark the last parameter list as such
.compose(Transformers.mapLast(toFinalArrayList))//
// execute query for each set of parameters
.concatMap(queryUpdate.executeOnce());
} else {
return bufferedParameters(queryUpdate) //
// execute query for each set of parameters
.concatMap(queryUpdate.executeOnce());
}
}
代码示例来源:origin: davidmoten/rxjava-jdbc
@Override
public Observable<T> call(Observable<R> source) {
if (operatorType == OperatorType.PARAMETER)
return builder.parameters(source).get(function);
else if (operatorType == OperatorType.DEPENDENCY)
// dependency
return builder.dependsOn(source).get(function);
else // PARAMETER_LIST
{
@SuppressWarnings("unchecked")
Observable<Observable<Object>> obs = (Observable<Observable<Object>>) source;
return obs.concatMap(new Func1<Observable<Object>, Observable<T>>() {
@Override
public Observable<T> call(Observable<Object> parameters) {
return builder.parameters(parameters).get(function);
}
});
}
}
代码示例来源:origin: ribot/ribot-app-android
public Observable<Void> syncRegisteredBeacons() {
String auth = RibotService.Util.buildAuthorization(mPreferencesHelper.getAccessToken());
return mRibotService.getRegisteredBeacons(auth)
.concatMap(new Func1<List<RegisteredBeacon>, Observable<Void>>() {
@Override
public Observable<Void> call(List<RegisteredBeacon> beacons) {
return mDatabaseHelper.setRegisteredBeacons(beacons);
}
})
.doOnCompleted(postEventSafelyAction(new BusEvent.BeaconsSyncCompleted()));
}
代码示例来源:origin: com.netflix.hystrix/hystrix-metrics-event-stream
/* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream.concatMap(new Func1<HystrixDashboardStream.DashboardData, Observable<String>>() {
@Override
public Observable<String> call(HystrixDashboardStream.DashboardData dashboardData) {
return Observable.from(SerialHystrixDashboardData.toMultipleJsonStrings(dashboardData));
}
}), pausePollerThreadDelayInMs);
}
代码示例来源:origin: org.hawkular.metrics/hawkular-metrics-core-service
@Override
public <T extends Number> Observable<NamedDataPoint<Double>> findRateData(List<MetricId<T>> ids, long start,
long end, int limit, Order order) {
return Observable.from(ids).concatMap(id -> findRateData(id, start, end, limit, order)
.map(dataPoint -> new NamedDataPoint<>(id.getName(), dataPoint)));
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public <T extends Number> Observable<NamedDataPoint<Double>> findRateData(List<MetricId<T>> ids, long start,
long end, int limit, Order order) {
return Observable.from(ids).concatMap(id -> findRateData(id, start, end, limit, order)
.map(dataPoint -> new NamedDataPoint<>(id.getName(), dataPoint)));
}
代码示例来源:origin: hawkular/hawkular-metrics
@Override
public <T> Observable<NamedDataPoint<T>> findDataPoints(String tenantId, MetricType<T> metricType,
String tagFilters, long start, long end, int limit, Order order) {
return findMetricIdentifiersWithFilters(tenantId, metricType, tagFilters)
.concatMap(id -> findDataPoints(id, start, end, limit, order)
.map(dataPoint -> new NamedDataPoint<>(id.getName(), dataPoint)));
}
代码示例来源:origin: com.trunk.rx.json/rxjava-json-core
protected JsonArray(Observable<T> elements) {
super(
Observable.<JsonToken>just(JsonArrayStart.instance())
.concatWith(
elements
.concatMap(jsonElement -> Observable.<JsonToken>just(JsonComma.instance()).concatWith(jsonElement))
.skip(1)
)
.concatWith(Observable.just(JsonArrayEnd.instance()))
);
this.elements = elements;
}
代码示例来源:origin: VictorAlbertos/RxGcm
public Observable<Boolean> sendGcmNotificationRequestingSupply(String title, String body) {
return RxGcm.Notifications.currentToken()
.map(token -> new Payload(token, title, body, TARGET_SUPPLY_GCM))
.concatMap(payload -> apiGcmServer.sendNotification(payload))
.map(gcmResponseServerResponse -> gcmResponseServerResponse.body().success())
.onErrorReturn(throwable -> false);
}
内容来源于网络,如有侵权,请联系作者删除!