rx.Observable.create()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(250)

本文整理了Java中rx.Observable.create()方法的一些代码示例,展示了Observable.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.create()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:create

Observable.create介绍

[英]Returns an Observable that will execute the specified function when a Subscriber subscribes to it.

Write the function you pass to create so that it behaves as an Observable: It should invoke the Subscriber's Subscriber#onNext, Subscriber#onError, and Subscriber#onCompleted methods appropriately.

A well-formed Observable must invoke either the Subscriber's onCompleted method exactly once or its onError method exactly once.

See Rx Design Guidelines (PDF) for detailed information. Scheduler: create does not operate by default on a particular Scheduler.
[中]返回当订阅者订阅时将执行指定函数的可观察对象。
编写要创建的函数,使其表现为可观察的:它应该适当地调用订阅服务器的订阅服务器#onNext、订阅服务器#onError和订阅服务器#onCompleted方法。
格式良好的Observable必须只调用订阅者的onCompleted方法一次,或者只调用其onError方法一次。
有关详细信息,请参见{$0$}。调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: jeasonlzy/okhttp-OkGo

  1. @Override
  2. public Observable<Response<T>> adapt(Call<T> call, AdapterParam param) {
  3. Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
  4. return Observable.create(subscribe);
  5. }
  6. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. action.call();
  7. s.onNext(true);
  8. s.onCompleted();
  9. }
  10. });
  11. }
  12. }

代码示例来源:origin: Netflix/conductor

  1. @Override
  2. public Observable<Message> observe() {
  3. OnSubscribe<Message> subscriber = getOnSubscribe();
  4. return Observable.create(subscriber);
  5. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. s.onError(new RuntimeException("onError"));
  7. }
  8. }).subscribeOn(userScheduler);
  9. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> resumeWithFallback() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. s.onNext(false);
  9. s.onCompleted();
  10. }
  11. }).subscribeOn(userScheduler);
  12. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. s.onNext(true);
  9. s.onCompleted();
  10. }
  11. }).subscribeOn(userScheduler);
  12. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> resumeWithFallback() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. s.onNext(false);
  9. s.onCompleted();
  10. }
  11. }).subscribeOn(userScheduler);
  12. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. throw new RuntimeException("bad onError");
  9. }
  10. }).subscribeOn(userScheduler);
  11. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> resumeWithFallback() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. s.onNext(false);
  9. s.onCompleted();
  10. }
  11. }).subscribeOn(userScheduler);
  12. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> resumeWithFallback() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. s.onNext(false);
  9. s.onCompleted();
  10. }
  11. }).subscribeOn(userScheduler);
  12. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
  7. results.originThread.set(Thread.currentThread());
  8. s.onError(new RuntimeException("graceful onError"));
  9. }
  10. }).subscribeOn(userScheduler);
  11. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. s.onError(new RuntimeException("onError"));
  7. }
  8. }).subscribeOn(userScheduler);
  9. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. s.onError(new RuntimeException("onError"));
  7. }
  8. }).subscribeOn(userScheduler);
  9. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. try {
  7. Thread.sleep(500);
  8. } catch (InterruptedException e) {
  9. // ignore the interrupted exception
  10. }
  11. }
  12. }).subscribeOn(userScheduler);
  13. }

代码示例来源:origin: jeasonlzy/okhttp-OkGo

  1. @Override
  2. public Observable<T> adapt(Call<T> call, AdapterParam param) {
  3. Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
  4. BodyOnSubscribe<T> bodySubscribe = new BodyOnSubscribe<>(subscribe);
  5. return Observable.create(bodySubscribe);
  6. }
  7. }

代码示例来源:origin: jeasonlzy/okhttp-OkGo

  1. @Override
  2. public Observable<Result<T>> adapt(Call<T> call, AdapterParam param) {
  3. Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
  4. ResultOnSubscribe<T> resultSubscribe = new ResultOnSubscribe<>(subscribe);
  5. return Observable.create(resultSubscribe);
  6. }
  7. }

代码示例来源:origin: PipelineAI/pipeline

  1. @Override
  2. protected Observable<Boolean> construct() {
  3. return Observable.create(new OnSubscribe<Boolean>() {
  4. @Override
  5. public void call(Subscriber<? super Boolean> s) {
  6. try {
  7. Thread.sleep(executionSleep);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. s.onNext(true);
  12. s.onCompleted();
  13. }
  14. }).subscribeOn(Schedulers.io());
  15. }

代码示例来源:origin: square/retrofit

  1. @Override public Object adapt(Call<R> call) {
  2. OnSubscribe<Response<R>> callFunc = isAsync
  3. ? new CallEnqueueOnSubscribe<>(call)
  4. : new CallExecuteOnSubscribe<>(call);
  5. OnSubscribe<?> func;
  6. if (isResult) {
  7. func = new ResultOnSubscribe<>(callFunc);
  8. } else if (isBody) {
  9. func = new BodyOnSubscribe<>(callFunc);
  10. } else {
  11. func = callFunc;
  12. }
  13. Observable<?> observable = Observable.create(func);
  14. if (scheduler != null) {
  15. observable = observable.subscribeOn(scheduler);
  16. }
  17. if (isSingle) {
  18. return observable.toSingle();
  19. }
  20. if (isCompletable) {
  21. return observable.toCompletable();
  22. }
  23. return observable;
  24. }
  25. }

代码示例来源:origin: Netflix/zuul

  1. @Override
  2. public Observable<HttpResponseMessage> applyAsync(HttpRequestMessage input)
  3. {
  4. if (WAIT_FOR_LASTCONTENT.get() && ! input.hasCompleteBody()) {
  5. // Return an observable that won't complete until after we have received the LastContent from client (ie. that we've
  6. // received the whole request body), so that we can't potentially corrupt the clients' http state on this connection.
  7. return Observable.create(subscriber -> {
  8. ZuulMessage response = this.apply(input);
  9. ResponseState state = new ResponseState(response, subscriber);
  10. input.getContext().set(KEY_FOR_SUBSCRIBER, state);
  11. });
  12. }
  13. else {
  14. return Observable.just(this.apply(input));
  15. }
  16. }

代码示例来源:origin: Netflix/zuul

  1. @Override
  2. public Observable<HttpResponseMessage> applyAsync(HttpRequestMessage input)
  3. {
  4. if (WAIT_FOR_LASTCONTENT.get() && ! input.hasCompleteBody()) {
  5. // Return an observable that won't complete until after we have received the LastContent from client (ie. that we've
  6. // received the whole request body), so that we can't potentially corrupt the clients' http state on this connection.
  7. return Observable.create(subscriber -> {
  8. ZuulMessage response = this.apply(input);
  9. ResponseState state = new ResponseState(response, subscriber);
  10. input.getContext().set(KEY_FOR_SUBSCRIBER, state);
  11. });
  12. }
  13. else {
  14. return Observable.just(this.apply(input));
  15. }
  16. }

相关文章

Observable类方法