io.reactivex.Observable.fromPublisher()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.4k)|赞(0)|评价(0)|浏览(197)

本文整理了Java中io.reactivex.Observable.fromPublisher()方法的一些代码示例,展示了Observable.fromPublisher()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.fromPublisher()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:fromPublisher

Observable.fromPublisher介绍

[英]Converts an arbitrary Reactive-Streams Publisher into an Observable.

The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.

If possible, use #create(ObservableOnSubscribe) to create a source-like Observable instead.

Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda. Backpressure: The source publisher is consumed in an unbounded fashion without applying any backpressure to it. Scheduler: fromPublisher does not operate by default on a particular Scheduler.
[中]将任意反应流发布器转换为可观察流。
发布者必须遵循Reactive-Streams specification。违反规范可能会导致未定义的行为。
如果可能,使用#create(observateOnSubscribe)来创建类似observate的源。
请注意,尽管Publisher看起来是一个功能接口,但不建议通过lambda实现它,因为规范要求使用无状态lambda无法实现状态管理。背压:源发布服务器以无限制的方式使用,而不向其施加任何背压。调度器:默认情况下,fromPublisher不会在特定的调度器上运行。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

  1. @Override
  2. public io.reactivex.Observable<?> apply(Publisher<?> source) {
  3. return io.reactivex.Observable.fromPublisher(source);
  4. }
  5. }

代码示例来源:origin: micronaut-projects/micronaut-core

  1. @SuppressWarnings("unchecked")
  2. @Override
  3. public BindingResult<Observable> bind(ArgumentConversionContext<Observable> context, HttpRequest<?> source) {
  4. Collection<Argument<?>> typeVariables = context.getArgument().getTypeVariables().values();
  5. BindingResult<Publisher> result = publisherBodyBinder.bind(
  6. ConversionContext.of(Argument.of(Publisher.class, (Argument[]) typeVariables.toArray(new Argument[0]))),
  7. source
  8. );
  9. if (result.isPresentAndSatisfied()) {
  10. return () -> Optional.of(Observable.fromPublisher(result.get()));
  11. }
  12. return BindingResult.EMPTY;
  13. }
  14. }

代码示例来源:origin: resilience4j/resilience4j

  1. @Override
  2. public ObservableSource<T> apply(Observable<T> upstream) {
  3. return Observable.fromPublisher(downstream -> {
  4. Flowable<T> flowable = upstream.toFlowable(BackpressureStrategy.BUFFER);
  5. SubscriptionArbiter sa = new SubscriptionArbiter();
  6. downstream.onSubscribe(sa);
  7. RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
  8. flowable.subscribe(retrySubscriber);
  9. });
  10. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void fromPublisher() {
  3. Observable.fromPublisher(Flowable.just(1))
  4. .test()
  5. .assertResult(1);
  6. }

代码示例来源:origin: akarnokd/RxJava2Extensions

  1. /**
  2. * Convert this Nono instance into an Observable that only terminates.
  3. * @param <T> the value type
  4. * @return the new Observable instance
  5. */
  6. @SuppressWarnings({ "unchecked", "rawtypes" })
  7. public final <T> Observable<T> toObservable() {
  8. return (Observable)Observable.fromPublisher(this);
  9. }

代码示例来源:origin: io.lettuce/lettuce-core

  1. @Override
  2. public io.reactivex.Observable<?> apply(Publisher<?> source) {
  3. return io.reactivex.Observable.fromPublisher(source);
  4. }
  5. }

代码示例来源:origin: com.github.akarnokd/rxjava2-extensions

  1. /**
  2. * Convert this Nono instance into an Observable that only terminates.
  3. * @param <T> the value type
  4. * @return the new Observable instance
  5. */
  6. @SuppressWarnings({ "unchecked", "rawtypes" })
  7. public final <T> Observable<T> toObservable() {
  8. return (Observable)Observable.fromPublisher(this);
  9. }

代码示例来源:origin: io.gravitee.am.identityprovider/gravitee-am-identityprovider-mongo

  1. @Override
  2. public void afterPropertiesSet() {
  3. // init users collection
  4. usersCollection = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
  5. // create index on username field
  6. Observable.fromPublisher(usersCollection.createIndex(new Document(configuration.getUsernameField(), 1))).subscribe();
  7. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. @Override
  2. public Single<Page<User>> findByDomain(String domain, int page, int size) {
  3. Single<Long> countOperation = Observable.fromPublisher(usersCollection.countDocuments(eq(FIELD_DOMAIN, domain))).first(0l);
  4. Single<Set<User>> usersOperation = Observable.fromPublisher(usersCollection.find(eq(FIELD_DOMAIN, domain)).sort(new BasicDBObject(FIELD_USERNAME, 1)).skip(size * page).limit(size)).map(this::convert).collect(LinkedHashSet::new, Set::add);
  5. return Single.zip(countOperation, usersOperation, (count, users) -> new Page<>(users, page, count));
  6. }

代码示例来源:origin: io.github.resilience4j/resilience4j-rxjava2

  1. @Override
  2. public ObservableSource<T> apply(Observable<T> upstream) {
  3. return Observable.fromPublisher(downstream -> {
  4. Flowable<T> flowable = upstream.toFlowable(BackpressureStrategy.BUFFER);
  5. SubscriptionArbiter sa = new SubscriptionArbiter();
  6. downstream.onSubscribe(sa);
  7. RetrySubscriber<T> retrySubscriber = new RetrySubscriber<>(downstream, retry.getRetryConfig().getMaxAttempts(), sa, flowable, retry);
  8. flowable.subscribe(retrySubscriber);
  9. });
  10. }

代码示例来源:origin: io.gravitee.am.identityprovider/gravitee-am-identityprovider-mongo

  1. @Override
  2. public Maybe<User> findByUsername(String username) {
  3. String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
  4. String jsonQuery = convertToJsonString(rawQuery);
  5. BsonDocument query = BsonDocument.parse(jsonQuery);
  6. return Observable.fromPublisher(usersCollection.find(query).first()).firstElement().map(this::convert);
  7. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. private Maybe<RefreshToken> findById(String id) {
  2. return Observable
  3. .fromPublisher(refreshTokenCollection.find(eq(FIELD_ID, id)).first())
  4. .firstElement()
  5. .map(this::convert);
  6. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. @Override
  2. public Maybe<RefreshToken> findByToken(String token) {
  3. return Observable
  4. .fromPublisher(refreshTokenCollection.find(eq(FIELD_TOKEN, token)).first())
  5. .firstElement()
  6. .map(this::convert);
  7. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. private Maybe<AccessToken> findById(String id) {
  2. return Observable
  3. .fromPublisher(accessTokenCollection.find(eq(FIELD_ID, id)).limit(1).first())
  4. .firstElement()
  5. .map(this::convert);
  6. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. @Override
  2. public Maybe<AccessToken> findByToken(String token) {
  3. return Observable
  4. .fromPublisher(accessTokenCollection.find(eq(FIELD_TOKEN, token)).limit(1).first())
  5. .firstElement()
  6. .map(this::convert);
  7. }

代码示例来源:origin: io.gravitee.am.identityprovider/gravitee-am-identityprovider-mongo

  1. private Maybe<Document> findUserByUsername(String username) {
  2. MongoCollection<Document> usersCol = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
  3. String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
  4. String jsonQuery = convertToJsonString(rawQuery);
  5. BsonDocument query = BsonDocument.parse(jsonQuery);
  6. return Observable.fromPublisher(usersCol.find(query).first()).firstElement();
  7. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. private Maybe<Document> findUserByUsername(String username) {
  2. MongoCollection<Document> usersCol = this.mongoClient.getDatabase(this.configuration.getDatabase()).getCollection(this.configuration.getUsersCollection());
  3. String rawQuery = this.configuration.getFindUserByUsernameQuery().replaceAll("\\?", username);
  4. String jsonQuery = convertToJsonString(rawQuery);
  5. BsonDocument query = BsonDocument.parse(jsonQuery);
  6. return Observable.fromPublisher(usersCol.find(query).first()).firstElement();
  7. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. @Override
  2. public Maybe<User> findByUsernameAndDomain(String domain, String username) {
  3. return Observable.fromPublisher(
  4. usersCollection
  5. .find(and(eq(FIELD_DOMAIN, domain), eq(FIELD_USERNAME, username)))
  6. .limit(1)
  7. .first())
  8. .firstElement()
  9. .map(this::convert);
  10. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. @Override
  2. public Maybe<Form> findByDomainAndClientAndTemplate(String domain, String client, String template) {
  3. return Observable.fromPublisher(
  4. formsCollection.find(
  5. and(
  6. eq(FIELD_DOMAIN, domain),
  7. eq(FIELD_CLIENT, client),
  8. eq(FIELD_TEMPLATE, template)))
  9. .first())
  10. .firstElement().map(this::convert);
  11. }

代码示例来源:origin: gravitee-io/graviteeio-access-management

  1. @Override
  2. public Maybe<User> findByDomainAndUsernameAndSource(String domain, String username, String source) {
  3. return Observable.fromPublisher(
  4. usersCollection
  5. .find(and(eq(FIELD_DOMAIN, domain), eq(FIELD_USERNAME, username), eq(FIELD_SOURCE, source)))
  6. .limit(1)
  7. .first())
  8. .firstElement()
  9. .map(this::convert);
  10. }

相关文章

Observable类方法