rx.Observable.filter()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(166)

本文整理了Java中rx.Observable.filter()方法的一些代码示例,展示了Observable.filter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.filter()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:filter

Observable.filter介绍

[英]Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.

Scheduler: filter does not operate by default on a particular Scheduler.
[中]通过仅发送满足指定谓词的项来过滤可观察项发出的项。
调度程序:默认情况下,过滤器不会在特定调度程序上运行。

代码示例

代码示例来源:origin: RichardWarburton/java-8-lambdas-exercises

  1. public Observable<Artist> search(String searchedName,
  2. String searchedNationality,
  3. int maxResults) {
  4. return getSavedArtists() // <1>
  5. .filter(name -> name.contains(searchedName)) // <2>
  6. .flatMap(this::lookupArtist) // <3>
  7. .filter(artist -> artist.getNationality() // <4>
  8. .contains(searchedNationality))
  9. .take(maxResults); // <5>
  10. }
  11. // END search

代码示例来源:origin: konmik/nucleus

  1. @Override
  2. public Observable<Delivery<View, T>> call(Observable<T> observable) {
  3. return Observable
  4. .combineLatest(
  5. view,
  6. observable
  7. .materialize()
  8. .filter(new Func1<Notification<T>, Boolean>() {
  9. @Override
  10. public Boolean call(Notification<T> notification) {
  11. return !notification.isOnCompleted();
  12. }
  13. }),
  14. new Func2<View, Notification<T>, Delivery<View, T>>() {
  15. @Override
  16. public Delivery<View, T> call(View view, Notification<T> notification) {
  17. return view == null ? null : new Delivery<>(view, notification);
  18. }
  19. })
  20. .filter(new Func1<Delivery<View, T>, Boolean>() {
  21. @Override
  22. public Boolean call(Delivery<View, T> delivery) {
  23. return delivery != null;
  24. }
  25. });
  26. }
  27. }

代码示例来源:origin: amitshekhariitbhu/Fast-Android-Networking

  1. .filter(new Func1<User, Boolean>() {
  2. @Override
  3. public Boolean call(User user) {

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. private void search() {
  2. RxView.clicks(mSearchBtn)
  3. .throttleFirst(2, TimeUnit.SECONDS)
  4. .map(aVoid -> mSearchEdit.getText().toString().trim())
  5. .filter(s -> !TextUtils.isEmpty(s))
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .subscribe(s -> {
  8. KeyBoardUtil.closeKeybord(mSearchEdit, TotalStationSearchActivity.this);
  9. showSearchAnim();
  10. clearData();
  11. content = s;
  12. getSearchData();
  13. });
  14. }

代码示例来源:origin: HotBitmapGG/bilibili-android-client

  1. .filter(integer -> !TextUtils.isEmpty(mSearchEdit.getText().toString().trim()))
  2. .filter(integer -> integer == EditorInfo.IME_ACTION_SEARCH)
  3. .flatMap(new Func1<Integer, Observable<String>>() {
  4. @Override

代码示例来源:origin: Rukey7/MvpApp

  1. /**
  2. * 先进行初始化,把之前下载的图片记录下来
  3. * @param dbDao
  4. */
  5. public static void init(BeautyPhotoInfoDao dbDao) {
  6. dbDao.queryBuilder().rx().list()
  7. .subscribeOn(Schedulers.io())
  8. .flatMap(new Func1<List<BeautyPhotoInfo>, Observable<BeautyPhotoInfo>>() {
  9. @Override
  10. public Observable<BeautyPhotoInfo> call(List<BeautyPhotoInfo> photoList) {
  11. return Observable.from(photoList);
  12. }
  13. })
  14. .filter(new Func1<BeautyPhotoInfo, Boolean>() {
  15. @Override
  16. public Boolean call(BeautyPhotoInfo bean) {
  17. return bean.isDownload();
  18. }
  19. })
  20. .subscribe(new Action1<BeautyPhotoInfo>() {
  21. @Override
  22. public void call(BeautyPhotoInfo bean) {
  23. sDlPhotos.put(bean.getImgsrc().hashCode(), true);
  24. }
  25. });
  26. }

代码示例来源:origin: apache/usergrid

  1. public Observable<MarkedEdge> repair( final ApplicationScope scope, final MarkedEdge edge, final UUID timestamp ) {
  2. //merge source and target then deal with the distinct values
  3. return Observable.just( edge ).filter( markedEdge-> markedEdge.isDeleted() )
  4. .doOnNext( markedEdge -> {
  5. //it's still in the same state as it was when we queued it. Remove it
  6. if(logger.isDebugEnabled()){
  7. logger.debug( "Removing edge {} ", markedEdge );
  8. }
  9. //remove from the commit log
  10. //remove from storage
  11. try {
  12. storageSerialization.deleteEdge( scope, markedEdge, timestamp ).execute();
  13. }
  14. catch ( ConnectionException e ) {
  15. throw new RuntimeException( "Unable to connect to casandra", e );
  16. }
  17. }
  18. );
  19. }

代码示例来源:origin: Rukey7/MvpApp

  1. .subscribeOn(Schedulers.io())
  2. .observeOn(AndroidSchedulers.mainThread())
  3. .filter(new Func1<NewsTypeInfo, Boolean>() {
  4. @Override
  5. public Boolean call(NewsTypeInfo newsTypeBean) {

代码示例来源:origin: Rukey7/MvpApp

  1. @Override
  2. public void getData(boolean isRefresh) {
  3. mDbDao.queryBuilder().rx()
  4. .oneByOne()
  5. .filter(new Func1<VideoInfo, Boolean>() {
  6. @Override
  7. public Boolean call(VideoInfo info) {
  8. // 判断是否存于下载中
  9. return (info.getDownloadStatus() != DownloadStatus.NORMAL &&
  10. info.getDownloadStatus() != DownloadStatus.COMPLETE);
  11. }
  12. })
  13. .toList()
  14. .observeOn(AndroidSchedulers.mainThread())
  15. .subscribe(new Action1<List<VideoInfo>>() {
  16. @Override
  17. public void call(List<VideoInfo> videoList) {
  18. if (ListUtils.isEmpty(videoList)) {
  19. mView.noData();
  20. } else {
  21. mView.loadData(videoList);
  22. }
  23. }
  24. });
  25. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<Edge> deleteEdge( final Edge edge ) {
  3. GraphValidation.validateEdge( edge );
  4. final UUID startTimestamp = UUIDGenerator.newTimeUUID();
  5. final Observable<Edge> observable =
  6. Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) {
  7. @Override
  8. protected Iterator<MarkedEdge> getIterator() {
  9. return storageEdgeSerialization.getEdgeVersions( scope,
  10. new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
  11. Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) );
  12. }
  13. } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked ->
  14. //fire our delete listener and wait for the results
  15. edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext(
  16. //log them
  17. count -> logger.trace( "removed {} types for edge {} ", count, edge ) )
  18. //return the marked edge
  19. .map( count -> marked ) );
  20. return ObservableTimer.time( observable, deleteEdgeTimer );
  21. }

代码示例来源:origin: konmik/nucleus

  1. final Subscription subscription = observable
  2. .materialize()
  3. .filter(new Func1<Notification<T>, Boolean>() {
  4. @Override
  5. public Boolean call(Notification<T> notification) {

代码示例来源:origin: konmik/nucleus

  1. @Override
  2. public Observable<Delivery<View, T>> call(Observable<T> observable) {
  3. return observable.materialize()
  4. .take(1)
  5. .switchMap(new Func1<Notification<T>, Observable<? extends Delivery<View, T>>>() {
  6. @Override
  7. public Observable<? extends Delivery<View, T>> call(final Notification<T> notification) {
  8. return view.map(new Func1<View, Delivery<View, T>>() {
  9. @Override
  10. public Delivery<View, T> call(View view) {
  11. return view == null ? null : new Delivery<>(view, notification);
  12. }
  13. });
  14. }
  15. })
  16. .filter(new Func1<Delivery<View, T>, Boolean>() {
  17. @Override
  18. public Boolean call(Delivery<View, T> delivery) {
  19. return delivery != null;
  20. }
  21. })
  22. .take(1);
  23. }
  24. }

代码示例来源:origin: apache/usergrid

  1. private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  2. return observable -> observable
  3. .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
  4. .filter(msg -> !msg.isEmpty())
  5. .doOnNext(indexOperation -> {
  6. asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
  7. });
  8. }

代码示例来源:origin: apache/usergrid

  1. private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
  2. return observable -> observable
  3. .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
  4. .filter(msg -> !msg.isEmpty())
  5. .doOnNext(indexOperation -> {
  6. asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
  7. });
  8. }

代码示例来源:origin: apache/usergrid

  1. .filter(id -> {
  2. final String type = InflectionUtils.pluralize(((Id) id).getType());
  3. return ! (type.equals(Schema.COLLECTION_USERS)

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Use Graph to get old appinfos from the old and deprecated System App.
  3. */
  4. public Observable<org.apache.usergrid.persistence.model.entity.Entity> getOldAppInfos() {
  5. final ApplicationScope systemAppScope = getApplicationScope( SYSTEM_APP_ID );
  6. final EntityCollectionManager systemCollectionManager =
  7. entityCollectionManagerFactory.createCollectionManager( systemAppScope );
  8. final GraphManager gm = graphManagerFactory.createEdgeManager( systemAppScope );
  9. String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( "appinfos" );
  10. Id rootAppId = systemAppScope.getApplication();
  11. final SimpleSearchByEdgeType simpleSearchByEdgeType =
  12. new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
  13. Optional.absent() );
  14. Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs =
  15. gm.loadEdgesFromSource( simpleSearchByEdgeType ).flatMap( edge -> {
  16. final Id appInfoId = edge.getTargetNode();
  17. return systemCollectionManager.load( appInfoId ).filter( entity -> ( entity != null ) );
  18. } );
  19. return entityObs;
  20. }

代码示例来源:origin: apache/usergrid

  1. @Override
  2. public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
  3. final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
  4. final Id entityId = entityIndexOperation.getId();
  5. //load the entity
  6. return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
  7. entity -> {
  8. final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
  9. /**
  10. * We don't have a modified field, so we can't check, pass it through
  11. */
  12. if ( modified == null ) {
  13. return true;
  14. }
  15. //entityIndexOperation.getUpdatedSince will always be 0 except for reindexing the application
  16. //only re-index if it has been updated and been updated after our timestamp
  17. return modified.getValue() >= entityIndexOperation.getUpdatedSince();
  18. } )
  19. //perform indexing on the task scheduler and start it
  20. .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
  21. }

代码示例来源:origin: apache/usergrid

  1. /**
  2. * Tests that reduce emits
  3. */
  4. @Test
  5. public void testReduceEmpty(){
  6. final int result = Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
  7. assertEquals(0, result);
  8. }

代码示例来源:origin: ReactiveX/RxNetty

  1. @Override
  2. public void call(Subscriber<? super Connection<R, W>> subscriber) {
  3. if (isShutdown) {
  4. subscriber.onError(new IllegalStateException("Connection provider is shutdown."));
  5. }
  6. idleConnectionsHolder.pollThisEventLoopConnections()
  7. .concatWith(connectIfAllowed())
  8. .filter(new Func1<PooledConnection<R, W>, Boolean>() {
  9. @Override
  10. public Boolean call(PooledConnection<R, W> c) {
  11. boolean isUsable = c.isUsable();
  12. if (!isUsable) {
  13. discardNow(c);
  14. }
  15. return isUsable;
  16. }
  17. })
  18. .take(1)
  19. .lift(new ReuseSubscriberLinker())
  20. .lift(new ConnectMetricsOperator())
  21. .unsafeSubscribe(subscriber);
  22. }
  23. });

代码示例来源:origin: apache/usergrid

  1. /**
  2. * TODO: Use Graph to get application_info for an specified Application.
  3. */
  4. private org.apache.usergrid.persistence.Entity getApplicationInfo( final UUID appId ) throws Exception {
  5. final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
  6. final EntityCollectionManager managementCollectionManager =
  7. entityCollectionManagerFactory.createCollectionManager( managementAppScope );
  8. Observable<MarkedEdge> edgesObservable = getApplicationInfoEdges( appId );
  9. //get the graph for all app infos
  10. Observable<org.apache.usergrid.persistence.model.entity.Entity> entityObs = edgesObservable.flatMap( edge -> {
  11. final Id appInfoId = edge.getTargetNode();
  12. return managementCollectionManager.load( appInfoId ).filter( entity -> {
  13. //check for app id
  14. return entity != null ? entity.getId().getUuid().equals( appId ) : false;
  15. } );
  16. } );
  17. // don't expect many applications, so we block
  18. org.apache.usergrid.persistence.model.entity.Entity applicationInfo =
  19. entityObs.toBlocking().lastOrDefault( null );
  20. if ( applicationInfo == null ) {
  21. return null;
  22. }
  23. Class clazz = Schema.getDefaultSchema().getEntityClass( applicationInfo.getId().getType() );
  24. org.apache.usergrid.persistence.Entity entity =
  25. EntityFactory.newEntity( applicationInfo.getId().getUuid(), applicationInfo.getId().getType(), clazz );
  26. entity.setProperties( CpEntityMapUtils.toMap( applicationInfo ) );
  27. return entity;
  28. }

相关文章

Observable类方法