rx.Observable.create()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(230)

本文整理了Java中rx.Observable.create()方法的一些代码示例,展示了Observable.create()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.create()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:create

Observable.create介绍

[英]Returns an Observable that will execute the specified function when a Subscriber subscribes to it.

Write the function you pass to create so that it behaves as an Observable: It should invoke the Subscriber's Subscriber#onNext, Subscriber#onError, and Subscriber#onCompleted methods appropriately.

A well-formed Observable must invoke either the Subscriber's onCompleted method exactly once or its onError method exactly once.

See Rx Design Guidelines (PDF) for detailed information. Scheduler: create does not operate by default on a particular Scheduler.
[中]返回当订阅者订阅时将执行指定函数的可观察对象。
编写要创建的函数,使其表现为可观察的:它应该适当地调用订阅服务器的订阅服务器#onNext、订阅服务器#onError和订阅服务器#onCompleted方法。
格式良好的Observable必须只调用订阅者的onCompleted方法一次,或者只调用其onError方法一次。
有关详细信息,请参见{$0$}。调度程序:默认情况下,创建不会在特定调度程序上运行。

代码示例

代码示例来源:origin: jeasonlzy/okhttp-OkGo

@Override
  public Observable<Response<T>> adapt(Call<T> call, AdapterParam param) {
    Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
    return Observable.create(subscribe);
  }
}

代码示例来源:origin: PipelineAI/pipeline

@Override
  protected Observable<Boolean> construct() {
    return Observable.create(new OnSubscribe<Boolean>() {
      @Override
      public void call(Subscriber<? super Boolean> s) {
        action.call();
        s.onNext(true);
        s.onCompleted();
      }
    });
  }
}

代码示例来源:origin: Netflix/conductor

@Override
public Observable<Message> observe() {
  OnSubscribe<Message> subscriber = getOnSubscribe();
  return Observable.create(subscriber);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      s.onError(new RuntimeException("onError"));
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> resumeWithFallback() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      s.onNext(false);
      s.onCompleted();
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      s.onNext(true);
      s.onCompleted();
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> resumeWithFallback() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      s.onNext(false);
      s.onCompleted();
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      throw new RuntimeException("bad onError");
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> resumeWithFallback() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      s.onNext(false);
      s.onCompleted();
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> resumeWithFallback() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      s.onNext(false);
      s.onCompleted();
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      results.isContextInitialized.set(HystrixRequestContext.isCurrentThreadInitialized());
      results.originThread.set(Thread.currentThread());
      s.onError(new RuntimeException("graceful onError"));
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      s.onError(new RuntimeException("onError"));
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      s.onError(new RuntimeException("onError"));
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        // ignore the interrupted exception
      }
    }
  }).subscribeOn(userScheduler);
}

代码示例来源:origin: jeasonlzy/okhttp-OkGo

@Override
  public Observable<T> adapt(Call<T> call, AdapterParam param) {
    Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
    BodyOnSubscribe<T> bodySubscribe = new BodyOnSubscribe<>(subscribe);
    return Observable.create(bodySubscribe);
  }
}

代码示例来源:origin: jeasonlzy/okhttp-OkGo

@Override
  public Observable<Result<T>> adapt(Call<T> call, AdapterParam param) {
    Observable.OnSubscribe<Response<T>> subscribe = AnalysisParams.analysis(call, param);
    ResultOnSubscribe<T> resultSubscribe = new ResultOnSubscribe<>(subscribe);
    return Observable.create(resultSubscribe);
  }
}

代码示例来源:origin: PipelineAI/pipeline

@Override
protected Observable<Boolean> construct() {
  return Observable.create(new OnSubscribe<Boolean>() {
    @Override
    public void call(Subscriber<? super Boolean> s) {
      try {
        Thread.sleep(executionSleep);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      s.onNext(true);
      s.onCompleted();
    }
  }).subscribeOn(Schedulers.io());
}

代码示例来源:origin: square/retrofit

@Override public Object adapt(Call<R> call) {
  OnSubscribe<Response<R>> callFunc = isAsync
    ? new CallEnqueueOnSubscribe<>(call)
    : new CallExecuteOnSubscribe<>(call);

  OnSubscribe<?> func;
  if (isResult) {
   func = new ResultOnSubscribe<>(callFunc);
  } else if (isBody) {
   func = new BodyOnSubscribe<>(callFunc);
  } else {
   func = callFunc;
  }
  Observable<?> observable = Observable.create(func);

  if (scheduler != null) {
   observable = observable.subscribeOn(scheduler);
  }

  if (isSingle) {
   return observable.toSingle();
  }
  if (isCompletable) {
   return observable.toCompletable();
  }
  return observable;
 }
}

代码示例来源:origin: Netflix/zuul

@Override
public Observable<HttpResponseMessage> applyAsync(HttpRequestMessage input)
{
  if (WAIT_FOR_LASTCONTENT.get() && ! input.hasCompleteBody()) {
    // Return an observable that won't complete until after we have received the LastContent from client (ie. that we've
    // received the whole request body), so that we can't potentially corrupt the clients' http state on this connection.
    return Observable.create(subscriber -> {
      ZuulMessage response = this.apply(input);
      ResponseState state = new ResponseState(response, subscriber);
      input.getContext().set(KEY_FOR_SUBSCRIBER, state);
    });
  }
  else {
    return Observable.just(this.apply(input));
  }
}

代码示例来源:origin: Netflix/zuul

@Override
public Observable<HttpResponseMessage> applyAsync(HttpRequestMessage input)
{
  if (WAIT_FOR_LASTCONTENT.get() && ! input.hasCompleteBody()) {
    // Return an observable that won't complete until after we have received the LastContent from client (ie. that we've
    // received the whole request body), so that we can't potentially corrupt the clients' http state on this connection.
    return Observable.create(subscriber -> {
      ZuulMessage response = this.apply(input);
      ResponseState state = new ResponseState(response, subscriber);
      input.getContext().set(KEY_FOR_SUBSCRIBER, state);
    });
  }
  else {
    return Observable.just(this.apply(input));
  }
}

相关文章

Observable类方法