本文整理了Java中rx.Observable.create()
方法的一些代码示例,展示了Observable.create()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.create()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:create
[英]Returns an Observable that will execute the specified function when a Subscriber subscribes to it.
Write the function you pass to create so that it behaves as an Observable: It should invoke the Subscriber's Subscriber#onNext, Subscriber#onError, and Subscriber#onCompleted methods appropriately.
A well-formed Observable must invoke either the Subscriber's onCompleted method exactly once or its onError method exactly once.
See Rx Design Guidelines (PDF) for detailed information. Scheduler: create does not operate by default on a particular Scheduler.
[中]返回当订阅者订阅时将执行指定函数的可观察对象。
编写要创建的函数,使其表现为可观察的:它应该适当地调用订阅服务器的订阅服务器#onNext、订阅服务器#onError和订阅服务器#onCompleted方法。
格式良好的Observable必须只调用订阅者的onCompleted方法一次,或者只调用其onError方法一次。
有关详细信息,请参见{$0$}。调度程序:默认情况下,创建不会在特定调度程序上运行。
代码示例来源:origin: jeasonlzy/okhttp-OkGo
@Override
public Observable<Response<T>> adapt(Call<T> call, AdapterParam param) {
Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
return Observable.create(subscribe);
}
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
action.call();
s.onNext(true);
s.onCompleted();
}
});
}
}
代码示例来源:origin: Netflix/conductor
@Override
public Observable<Message> observe() {
OnSubscribe<Message> subscriber = getOnSubscribe();
return Observable.create(subscriber);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
s.onError(new RuntimeException("onError"));
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
s.onNext(true);
s.onCompleted();
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
throw new RuntimeException("bad onError");
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> resumeWithFallback() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
results.originThread.set(Thread.currentThread());
s.onError(new RuntimeException("graceful onError"));
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
s.onError(new RuntimeException("onError"));
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
s.onError(new RuntimeException("onError"));
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore the interrupted exception
}
}
}).subscribeOn(userScheduler);
}
代码示例来源:origin: jeasonlzy/okhttp-OkGo
@Override
public Observable<T> adapt(Call<T> call, AdapterParam param) {
Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
BodyOnSubscribe<T> bodySubscribe = new BodyOnSubscribe<>(subscribe);
return Observable.create(bodySubscribe);
}
}
代码示例来源:origin: jeasonlzy/okhttp-OkGo
@Override
public Observable<Result<T>> adapt(Call<T> call, AdapterParam param) {
Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
ResultOnSubscribe<T> resultSubscribe = new ResultOnSubscribe<>(subscribe);
return Observable.create(resultSubscribe);
}
}
代码示例来源:origin: PipelineAI/pipeline
@Override
protected Observable<Boolean> construct() {
return Observable.create(new OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
try {
Thread.sleep(executionSleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.onNext(true);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
代码示例来源:origin: square/retrofit
@Override public Object adapt(Call<R> call) {
OnSubscribe<Response<R>> callFunc = isAsync
? new CallEnqueueOnSubscribe<>(call)
: new CallExecuteOnSubscribe<>(call);
OnSubscribe<?> func;
if (isResult) {
func = new ResultOnSubscribe<>(callFunc);
} else if (isBody) {
func = new BodyOnSubscribe<>(callFunc);
} else {
func = callFunc;
}
Observable<?> observable = Observable.create(func);
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isSingle) {
return observable.toSingle();
}
if (isCompletable) {
return observable.toCompletable();
}
return observable;
}
}
代码示例来源:origin: Netflix/zuul
@Override
public Observable<HttpResponseMessage> applyAsync(HttpRequestMessage input)
{
if (WAIT_FOR_LASTCONTENT.get() && ! input.hasCompleteBody()) {
// Return an observable that won't complete until after we have received the LastContent from client (ie. that we've
// received the whole request body), so that we can't potentially corrupt the clients' http state on this connection.
return Observable.create(subscriber -> {
ZuulMessage response = this.apply(input);
ResponseState state = new ResponseState(response, subscriber);
input.getContext().set(KEY_FOR_SUBSCRIBER, state);
});
}
else {
return Observable.just(this.apply(input));
}
}
代码示例来源:origin: Netflix/zuul
@Override
public Observable<HttpResponseMessage> applyAsync(HttpRequestMessage input)
{
if (WAIT_FOR_LASTCONTENT.get() && ! input.hasCompleteBody()) {
// Return an observable that won't complete until after we have received the LastContent from client (ie. that we've
// received the whole request body), so that we can't potentially corrupt the clients' http state on this connection.
return Observable.create(subscriber -> {
ZuulMessage response = this.apply(input);
ResponseState state = new ResponseState(response, subscriber);
input.getContext().set(KEY_FOR_SUBSCRIBER, state);
});
}
else {
return Observable.just(this.apply(input));
}
}
内容来源于网络,如有侵权,请联系作者删除!