io.reactivex.subjects.Subject.filter()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(116)

本文整理了Java中io.reactivex.subjects.Subject.filter()方法的一些代码示例,展示了Subject.filter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.filter()方法的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject
方法名:filter

Subject.filter介绍

暂无

代码示例

代码示例来源:origin: square/sqlbrite

@CheckResult @NonNull
private QueryObservable createQuery(DatabaseQuery query) {
 if (transactions.get() != null) {
  throw new IllegalStateException("Cannot create observable query in transaction. "
    + "Use query() for a query inside a transaction.");
 }
 return triggers //
   .filter(query) // DatabaseQuery filters triggers to on tables we care about.
   .map(query) // DatabaseQuery maps to itself to save an allocation.
   .startWith(query) //
   .observeOn(scheduler) //
   .compose(queryTransformer) // Apply the user's query transformer.
   .doOnSubscribe(ensureNotInTransaction)
   .to(QUERY_OBSERVABLE);
}

代码示例来源:origin: requery/requery

static <T> Observable<ReactiveResult<T>> toObservableResult(final ReactiveResult<T> result) {
    final QueryElement<?> element = result.unwrapQuery();
    // ensure the transaction listener is added in the target data store
    result.addTransactionListener(typeChanges);
    return typeChanges.commitSubject()
      .filter(new Predicate<Set<Type<?>>>() {
        @Override
        public boolean test(Set<Type<?>> types) {
          return !Collections.disjoint(element.entityTypes(), types) ||
              Types.referencesType(element.entityTypes(), types);
        }
      }).map(new Function<Set<Type<?>>, ReactiveResult<T>>() {
        @Override
        public ReactiveResult<T> apply(Set<Type<?>> types) {
          return result;
        }
      }).startWith(result);
  }
}

代码示例来源:origin: io.apptik.rhub/roxy-rxjava2

@Override
@SuppressWarnings("unchecked")
public <T> Observable<T> pub(final Class<T> filterClass) {
  return subj.filter(new Predicate() {
    @Override
    public boolean test(Object obj) throws Exception {
      return filterClass.isAssignableFrom(obj.getClass());
    }
  });
}

代码示例来源:origin: apptik/RHub

@Override
@SuppressWarnings("unchecked")
public <T> Observable<T> pub(final Class<T> filterClass) {
  return subj.filter(new Predicate() {
    @Override
    public boolean test(Object obj) throws Exception {
      return filterClass.isAssignableFrom(obj.getClass());
    }
  });
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of BlockInfo.
 * Each time a new Block is added into the blockchain,
 * it emits a new BlockInfo in the event stream.
 *
 * @return an observable stream of BlockInfo
 */
public Observable<BlockInfo> newBlock() {
  this.subscribeTo(ListenerChannel.BLOCK.toString());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.BLOCK))
      .map(rawMessage -> (BlockInfo) rawMessage.getMessage());
}

代码示例来源:origin: io.requery/requery

static <T> Observable<ReactiveResult<T>> toObservableResult(final ReactiveResult<T> result) {
    final QueryElement<?> element = result.unwrapQuery();
    // ensure the transaction listener is added in the target data store
    result.addTransactionListener(typeChanges);
    return typeChanges.commitSubject()
      .filter(new Predicate<Set<Type<?>>>() {
        @Override
        public boolean test(Set<Type<?>> types) {
          return !Collections.disjoint(element.entityTypes(), types) ||
              Types.referencesType(element.entityTypes(), types);
        }
      }).map(new Function<Set<Type<?>>, ReactiveResult<T>>() {
        @Override
        public ReactiveResult<T> apply(Set<Type<?>> types) {
          return result;
        }
      }).startWith(result);
  }
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of {@link TransactionStatusError} for specific address.
 * Each time a transaction contains an error,
 * it emits a new message with the transaction status error in the event stream.
 *
 * @param address address we listen to be notified when some error happened
 * @return an observable stream of {@link TransactionStatusError}
 */
public Observable<TransactionStatusError> status(Address address) {
  this.subscribeTo(ListenerChannel.STATUS + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.STATUS))
      .map(rawMessage -> (TransactionStatusError) rawMessage.getMessage());
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of {@link CosignatureSignedTransaction} for specific address.
 * Each time a cosigner signs a transaction the address initialized,
 * it emits a new message with the cosignatory signed transaction in the even stream.
 *
 * @param address address we listen when a cosignatory is added to some transaction address sent
 * @return an observable stream of {@link CosignatureSignedTransaction}
 */
public Observable<CosignatureSignedTransaction> cosignatureAdded(Address address) {
  this.subscribeTo(ListenerChannel.CONFIRMED_ADDED + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.COSIGNATURE))
      .map(rawMessage -> (CosignatureSignedTransaction) rawMessage.getMessage());
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of Transaction Hashes for specific address.
 * Each time a transaction with state unconfirmed changes its state,
 * it emits a new message with the transaction hash in the event stream.
 *
 * @param address address we listen when a transaction is removed from unconfirmed state
 * @return an observable stream of Strings with the transaction hash
 */
public Observable<String> unconfirmedRemoved(Address address) {
  this.subscribeTo(ListenerChannel.UNCONFIRMED_REMOVED + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.UNCONFIRMED_REMOVED))
      .map(rawMessage -> (String) rawMessage.getMessage());
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of Transaction Hashes for specific address.
 * Each time an aggregate bonded transaction is announced,
 * it emits a new message with the transaction hash in the event stream.
 *
 * @param address address we listen when a transaction is confirmed or rejected
 * @return an observable stream of Strings with the transaction hash
 */
public Observable<String> aggregateBondedRemoved(Address address) {
  this.subscribeTo(ListenerChannel.AGGREGATE_BONDED_REMOVED + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.AGGREGATE_BONDED_REMOVED))
      .map(rawMessage -> (String) rawMessage.getMessage());
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Return an observable of {@link AggregateTransaction} for specific address.
 * Each time an aggregate bonded transaction is announced,
 * it emits a new {@link AggregateTransaction} in the event stream.
 *
 * @param address address we listen when a transaction with missing signatures state
 * @return an observable stream of AggregateTransaction with missing signatures state
 */
public Observable<AggregateTransaction> aggregateBondedAdded(Address address) {
  this.subscribeTo(ListenerChannel.AGGREGATE_BONDED_ADDED + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.AGGREGATE_BONDED_ADDED))
      .map(rawMessage -> (AggregateTransaction) rawMessage.getMessage())
      .filter(transaction -> this.transactionFromAddress(transaction, address));
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of Transaction for a specific address.
 * Each time a transaction is in unconfirmed state an it involves the address,
 * it emits a new Transaction in the event stream.
 *
 * @param address address we listen when a transaction is in unconfirmed state
 * @return an observable stream of Transaction with state unconfirmed
 */
public Observable<Transaction> unconfirmedAdded(Address address) {
  this.subscribeTo(ListenerChannel.UNCONFIRMED_ADDED + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.UNCONFIRMED_ADDED))
      .map(rawMessage -> (Transaction) rawMessage.getMessage())
      .filter(transaction -> this.transactionFromAddress(transaction, address));
}

代码示例来源:origin: nemtech/nem2-sdk-java

/**
 * Returns an observable stream of Transaction for a specific address.
 * Each time a transaction is in confirmed state an it involves the address,
 * it emits a new Transaction in the event stream.
 *
 * @param address address we listen when a transaction is in confirmed state
 * @return an observable stream of Transaction with state confirmed
 */
public Observable<Transaction> confirmed(final Address address) {
  this.subscribeTo(ListenerChannel.CONFIRMED_ADDED.toString() + "/" + address.plain());
  return this.messageSubject
      .filter(rawMessage -> rawMessage.getChannel().equals(ListenerChannel.CONFIRMED_ADDED))
      .map(rawMessage -> (Transaction) rawMessage.getMessage())
      .filter(transaction -> this.transactionFromAddress(transaction, address));
}

代码示例来源:origin: oVirt/moVirt

.filter(loginStatus -> loginStatus.isAccount(account))
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())

代码示例来源:origin: oVirt/moVirt

@Override
public ConnectionSettingsPresenter initialize() {
  super.initialize();
  try {
    messageHelper = environmentStore.getMessageHelper(account);
    AccountPropertiesManager propertiesManager = environmentStore.getAccountPropertiesManager(account);
    getView().setPasswordVisibility(commonSharedPreferencesHelper.isPasswordVisible());
    getView().displayApiUrl(propertiesManager.getApiUrl());
    getView().displayUserName(propertiesManager.getUsername());
    getView().displayPassword(propertiesManager.getPassword());
    getView().displayTitle(account.getName());
    getView().showLoginInProgress(environmentStore.isLoginInProgress(account));
    getDisposables().add(rxStore.LOGIN_STATUS
        .filter(loginStatus -> loginStatus.isAccount(account))
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(loginStatus -> getView().showLoginInProgress(loginStatus.isInProgress())));
  } catch (AccountDeletedException e) {
    finishSafe();
  }
  return this;
}

相关文章