io.reactivex.Observable.fromPublisher()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(174)

本文整理了Java中io.reactivex.Observable.fromPublisher()方法的一些代码示例,展示了Observable.fromPublisher()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.fromPublisher()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:fromPublisher

Observable.fromPublisher介绍

[英]Converts an arbitrary Reactive-Streams Publisher into an Observable.

The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.

If possible, use #create(ObservableOnSubscribe) to create a source-like Observable instead.

Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda. Backpressure: The source publisher is consumed in an unbounded fashion without applying any backpressure to it. Scheduler: fromPublisher does not operate by default on a particular Scheduler.
[中]将任意反应流发布器转换为可观察流。
发布者必须遵循Reactive-Streams specification。违反规范可能会导致未定义的行为。
如果可能,使用#create(observateOnSubscribe)来创建类似observate的源。
请注意,尽管Publisher看起来是一个功能接口,但不建议通过lambda实现它,因为规范要求使用无状态lambda无法实现状态管理。背压:源发布服务器以无限制的方式使用,而不向其施加任何背压。调度器:默认情况下,fromPublisher不会在特定的调度器上运行。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
  public io.reactivex.Observable<?> apply(Publisher<?> source) {
    return io.reactivex.Observable.fromPublisher(source);
  }
}

代码示例来源:origin: micronaut-projects/micronaut-core

@SuppressWarnings("unchecked")
  @Override
  public BindingResult<Observable> bind(ArgumentConversionContext<Observable> context, HttpRequest<?> source) {
    Collection<Argument<?>> typeVariables = context.getArgument().getTypeVariables().values();

    BindingResult<Publisher> result = publisherBodyBinder.bind(
      ConversionContext.of(Argument.of(Publisher.class, (Argument[]) typeVariables.toArray(new Argument[0]))),
      source
    );
    if (result.isPresentAndSatisfied()) {
      return () -> Optional.of(Observable.fromPublisher(result.get()));
    }
    return BindingResult.EMPTY;
  }
}

代码示例来源:origin: resilience4j/resilience4j

@Override
public ObservableSource<T> apply(Observable<T> upstream) {
  return Observable.fromPublisher(downstream -> {
    Flowable<T> flowable = upstream.toFlowable(BackpressureStrategy.BUFFER);
    SubscriptionArbiter sa = new SubscriptionArbiter();
    downstream.onSubscribe(sa);
    RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
    flowable.subscribe(retrySubscriber);
  });
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fromPublisher() {
  Observable.fromPublisher(Flowable.just(1))
  .test()
  .assertResult(1);
}

代码示例来源:origin: akarnokd/RxJava2Extensions

/**
 * Convert this Nono instance into an Observable that only terminates.
 * @param <T> the value type
 * @return the new Observable instance
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public final <T> Observable<T> toObservable() {
  return (Observable)Observable.fromPublisher(this);
}

代码示例来源:origin: io.lettuce/lettuce-core

@Override
  public io.reactivex.Observable<?> apply(Publisher<?> source) {
    return io.reactivex.Observable.fromPublisher(source);
  }
}

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

/**
 * Convert this Nono instance into an Observable that only terminates.
 * @param <T> the value type
 * @return the new Observable instance
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public final <T> Observable<T> toObservable() {
  return (Observable)Observable.fromPublisher(this);
}

代码示例来源:origin: io.gravitee.am.identityprovider/gravitee-am-identityprovider-mongo

@Override
public void afterPropertiesSet() {
  // init users collection
  usersCollection = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
  // create index on username field
  Observable.fromPublisher(usersCollection.createIndex(new Document(configuration.getUsernameField(), 1))).subscribe();
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Single<Page<User>> findByDomain(String domain, int page, int size) {
  Single<Long> countOperation = Observable.fromPublisher(usersCollection.countDocuments(eq(FIELD_DOMAIN, domain))).first(0l);
  Single<Set<User>> usersOperation = Observable.fromPublisher(usersCollection.find(eq(FIELD_DOMAIN, domain)).sort(new BasicDBObject(FIELD_USERNAME, 1)).skip(size * page).limit(size)).map(this::convert).collect(LinkedHashSet::new, Set::add);
  return Single.zip(countOperation, usersOperation, (count, users) -> new Page<>(users, page, count));
}

代码示例来源:origin: io.github.resilience4j/resilience4j-rxjava2

@Override
public ObservableSource<T> apply(Observable<T> upstream) {
  return Observable.fromPublisher(downstream -> {
    Flowable<T> flowable = upstream.toFlowable(BackpressureStrategy.BUFFER);
    SubscriptionArbiter sa = new SubscriptionArbiter();
    downstream.onSubscribe(sa);
    RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
    flowable.subscribe(retrySubscriber);
  });
}

代码示例来源:origin: io.gravitee.am.identityprovider/gravitee-am-identityprovider-mongo

@Override
public Maybe<User> findByUsername(String username) {
  String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
  String jsonQuery = convertToJsonString(rawQuery);
  BsonDocument query = BsonDocument.parse(jsonQuery);
  return Observable.fromPublisher(usersCollection.find(query).first()).firstElement().map(this::convert);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

private Maybe<RefreshToken> findById(String id) {
  return Observable
      .fromPublisher(refreshTokenCollection.find(eq(FIELD_ID, id)).first())
      .firstElement()
      .map(this::convert);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Maybe<RefreshToken> findByToken(String token) {
  return Observable
      .fromPublisher(refreshTokenCollection.find(eq(FIELD_TOKEN, token)).first())
      .firstElement()
      .map(this::convert);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

private Maybe<AccessToken> findById(String id) {
  return Observable
      .fromPublisher(accessTokenCollection.find(eq(FIELD_ID, id)).limit(1).first())
      .firstElement()
      .map(this::convert);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Maybe<AccessToken> findByToken(String token) {
  return Observable
      .fromPublisher(accessTokenCollection.find(eq(FIELD_TOKEN, token)).limit(1).first())
      .firstElement()
      .map(this::convert);
}

代码示例来源:origin: io.gravitee.am.identityprovider/gravitee-am-identityprovider-mongo

private Maybe<Document> findUserByUsername(String username) {
  MongoCollection<Document> usersCol = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
  String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
  String jsonQuery = convertToJsonString(rawQuery);
  BsonDocument query = BsonDocument.parse(jsonQuery);
  return Observable.fromPublisher(usersCol.find(query).first()).firstElement();
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

private Maybe<Document> findUserByUsername(String username) {
  MongoCollection<Document> usersCol = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
  String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
  String jsonQuery = convertToJsonString(rawQuery);
  BsonDocument query = BsonDocument.parse(jsonQuery);
  return Observable.fromPublisher(usersCol.find(query).first()).firstElement();
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Maybe<User> findByUsernameAndDomain(String domain, String username) {
  return Observable.fromPublisher(
      usersCollection
          .find(and(eq(FIELD_DOMAIN, domain), eq(FIELD_USERNAME, username)))
          .limit(1)
          .first())
      .firstElement()
      .map(this::convert);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Maybe<Form> findByDomainAndClientAndTemplate(String domain, String client, String template) {
  return Observable.fromPublisher(
      formsCollection.find(
          and(
              eq(FIELD_DOMAIN, domain),
              eq(FIELD_CLIENT, client),
              eq(FIELD_TEMPLATE, template)))
          .first())
      .firstElement().map(this::convert);
}

代码示例来源:origin: gravitee-io/graviteeio-access-management

@Override
public Maybe<User> findByDomainAndUsernameAndSource(String domain, String username, String source) {
  return Observable.fromPublisher(
      usersCollection
          .find(and(eq(FIELD_DOMAIN, domain), eq(FIELD_USERNAME, username), eq(FIELD_SOURCE, source)))
          .limit(1)
          .first())
      .firstElement()
      .map(this::convert);
}

相关文章

Observable类方法