rx.Observable.publish()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.8k)|赞(0)|评价(0)|浏览(131)

本文整理了Java中rx.Observable.publish()方法的一些代码示例,展示了Observable.publish()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.publish()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:publish

Observable.publish介绍

[英]Returns a ConnectableObservable, which waits until its ConnectableObservable#connect method is called before it begins emitting items to those Observers that have subscribed to it.

Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: publish does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它将等待调用其ConnectableObservable#connect方法,然后开始向订阅它的观察者发送项目。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。计划程序:默认情况下,发布不会在特定计划程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

@Test
@Category(ExperimentalTest.class )
public void testConnectableObserver() throws InterruptedException {
  final int count = 10;
  final CountDownLatch latch = new CountDownLatch( count );
  final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
  //connect to our latch, which should run on it's own subscription
  //start our latch running
  connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
  final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
  //start the sequence
  connectedObservable.connect();
  final boolean completed = latch.await( 5, TimeUnit.SECONDS );
  assertTrue( "publish1 behaves as expected", completed );
  final int returnedCount = countObservable.toBlocking().last();
  assertEquals( "Counts the same", count, returnedCount );
}

代码示例来源:origin: cn-ljb/rxjava_for_android

@Override
public void onStart() {
  super.onStart();
  //将普通的Observable转换为可连接的Observable
  ConnectableObservable<Object> publish = _rxBus.toObserverable().publish();
  publish.compose(this.bindToLifecycle())
      .subscribe(new Action1<Object>() { //一个一旦被触发就会显示TapText的监听者
        @Override
        public void call(Object event) {
          if (event instanceof RxBusDemoFragment.TapEvent) {
            _showTapText();
          }
        }
      });
  publish.compose(this.bindUntilEvent(FragmentEvent.DESTROY))
      .publish(new Func1<Observable<Object>, Observable<List<Object>>>() {//一个出发后缓存一秒内的点击数并显示的监听者
        @Override
        public Observable<List<Object>> call(Observable<Object> stream) {
          return stream.buffer(stream.debounce(1, TimeUnit.SECONDS)); //进行缓冲1秒,打包发送
        }
      }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
    @Override
    public void call(List<Object> taps) {
      _showTapCount(taps.size());
    }
  });
  publish.connect();  //可连接的Observable并不在订阅时触发,而需手动调用connect()方法
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String args[]) {
  // debounce to the last value in each burst
  //intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
  /* The following will emit a buffered list as it is debounced */
  // first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe
  Observable<Integer> burstStream = intermittentBursts().take(20).publish().refCount();
  // then we get the debounced version
  Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS);
  // then the buffered one that uses the debounced stream to demark window start/stop
  Observable<List<Integer>> buffered = burstStream.buffer(debounced);
  // then we subscribe to the buffered stream so it does what we want
  buffered.toBlocking().forEach(System.out::println);
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@Override
public void onStart() {
  super.onStart();
  _subscriptions = new CompositeSubscription();
  ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();
  _subscriptions//
     .add(tapEventEmitter.subscribe(new Action1<Object>() {
       @Override
       public void call(Object event) {
         if (event instanceof RxBusDemoFragment.TapEvent) {
           _showTapText();
         }
       }
     }));
  _subscriptions//
     .add(tapEventEmitter.publish(new Func1<Observable<Object>, Observable<List<Object>>>() {
       @Override
       public Observable<List<Object>> call(Observable<Object> stream) {
         return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));
       }
     }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
       @Override
       public void call(List<Object> taps) {
         _showTapCount(taps.size());
       }
     }));
  _subscriptions.add(tapEventEmitter.connect());
}

代码示例来源:origin: spotify/mobius

@Override
 public Observable<R> call(Observable<T> input) {
  return input.publish(
    new Func1<Observable<T>, Observable<R>>() {
     @Override
     public Observable<R> call(Observable<T> innerInput) {
      final List<Observable<R>> transformed = new ArrayList<>();
      for (Observable.Transformer<T, R> transformer : transformers) {
       transformed.add(innerInput.compose(transformer));
      }
      return Observable.merge(transformed);
     }
    });
 }
}

代码示例来源:origin: com.github.cloudfoundry-community/snotel

@Override
  public Observable<Envelope> open() {
    if (closed) {
      throw new IllegalStateException("The firehose client is closed.");
    }
    return Observable
        .create(onSubscribe)
        .publish()
        .refCount();
  }
};

代码示例来源:origin: stephanenicolas/toothpick

@Inject
public RxPresenter() {
 timeObservable = Observable.interval(1, TimeUnit.SECONDS) //
   .subscribeOn(Schedulers.newThread()) //
   .observeOn(AndroidSchedulers.mainThread()) //
   .publish();
 connect = timeObservable.connect();
}

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

protected Observable<String> getServiceIdsFromDiscoveryClient() {
  logger.info("Discovering services for health");
  return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
      .map(id -> id.toLowerCase())
      .filter(id -> !id.equals(ZUUL))
      .doOnNext(s -> logger.debug("Service discovered: " + s))
      .doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
      .retry();
}

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

private Observable<String> getServicesFromDiscoveryClient() {
  logger.info("Discovering services");
  return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
      .map(String::toLowerCase)
      .doOnNext(s -> logger.debug("Service discovered: " + s))
      .doOnError(e -> {
        String error = "Error retrieving services: " + e.getMessage();
        logger.error(error);
        publisher.publishEvent(new SystemEvent(error, e));
      })
      .retry();
}

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

protected Observable<String> getServiceIdsFromDiscoveryClient() {
  logger.info("Discovering services for mappings");
  return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
      .map(id -> id.toLowerCase())
      .filter(id -> !id.equals(ZUUL))
      .doOnNext(s -> logger.debug("Service discovered: " + s))
      .doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
      .retry();
}

代码示例来源:origin: com.github.kmbulebu.nicknack/nicknack-core

private ProviderServiceImpl(final Path providersDirectory, final XMLConfiguration configuration) {
  this.configuration = configuration;
  loader = new ProviderLoader(providersDirectory);
  eventStream = Observable.create(this).publish();
  eventStream.connect();
  // TODO Do something smart with the errors.
  initializeProviders();
}

代码示例来源:origin: akarnokd/RxAgera

public RxObservableAsAgera(rx.Observable<?> source) {
  if (source instanceof Subject) {
    // no need to publish/autoConnect a subject
    this.source = source.doAfterTerminate(this);
  } else {
    this.source = source.doAfterTerminate(this).publish().autoConnect();
  }
  this.updatables = new HashMap<>();
}

代码示例来源:origin: dswarm/dswarm

protected ConnectableObservable<GDMModel> transformResultModel(final Observable<org.dswarm.persistence.model.internal.Model> model) {
  final AtomicInteger resultCounter = new AtomicInteger(0);
  return model
      .onBackpressureBuffer(10000)
      .doOnSubscribe(() -> GDMModelTransformationFlow.LOG.debug("subscribed to results observable in transformation engine"))
      .doOnNext(resultObj -> {
        resultCounter.incrementAndGet();
        if (resultCounter.get() == 1) {
          GDMModelTransformationFlow.LOG.debug("received first result in transformation engine");
        }
      })
      .doOnCompleted(() -> GDMModelTransformationFlow.LOG.debug("received '{}' results in transformation engine overall", resultCounter.get()))
      .cast(org.dswarm.persistence.model.internal.gdm.GDMModel.class)
      .onBackpressureBuffer(10000)
      .publish();
}

代码示例来源:origin: dswarm/dswarm

protected ConnectableObservable<org.dswarm.persistence.model.internal.Model> doPostProcessingOfResultModel(final GDMModelReceiver writer, final Scheduler scheduler) {
  final ConnectableObservable<GDMModel> modelConnectableObservable = writer.getObservable()
      .observeOn(scheduler)
      .onBackpressureBuffer(10000)
      .publish();
  final ConnectableObservable<org.dswarm.persistence.model.internal.Model> model = doPostProcessingOfResultModel(modelConnectableObservable);
  modelConnectableObservable.connect();
  return model;
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-bus-turbine

@Bean
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
  // multicast so multiple concurrent subscribers get the same stream
  Observable<Map<String, Object>> publishedStreams = StreamAggregator.aggregateGroupedStreams(hystrixSubject()
        .groupBy(data -> InstanceKey.create((String) data.get("instanceId"))))
      .doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing aggregation."))
      .doOnSubscribe(() -> log.info("BusTurbine => Starting aggregation"))
      .flatMap(o -> o).publish().refCount();
  int port = new Integer(env.getProperty("server.port", "8989"));
  HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(port, (request, response) -> {
    log.info("BusTurbine => SSE Request Received");
    response.getHeaders().setHeader("Content-Type", "text/event-stream");
    return publishedStreams
        .doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing RxNetty server connection"))
        .flatMap(data -> response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data))));
  }, sseServerConfigurator());
  return httpServer;
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_186() throws Exception {
  Observable<Status> observable = status();
  Observable<Status> lazy = observable.publish().refCount();
  //...
  System.out.println("Before subscribers");
  Subscription sub1 = lazy.subscribe();
  System.out.println("Subscribed 1");
  Subscription sub2 = lazy.subscribe();
  System.out.println("Subscribed 2");
  sub1.unsubscribe();
  System.out.println("Unsubscribed 1");
  sub2.unsubscribe();
  System.out.println("Unsubscribed 2");
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) {
    Observable.range(1, 10)
    .doOnSubscribe(() -> System.out.println("Subscribed"))
    .publish(o -> Observable.zip(o.map(v -> v * 10), o.map(v -> v * 20), (a, b) -> a + "-" + b))
    .subscribe(System.out::println, Throwable::printStackTrace);

  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_206() throws Exception {
  final Observable<Status> tweets = status();
  ConnectableObservable<Status> published = tweets.publish();
  published.connect();
}

代码示例来源:origin: akarnokd/akarnokd-misc

static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
    return f ->
      f.publish(g ->
        g.take(1)
        .concatWith(
          g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
          .take(1)
          .ignoreElements()
        )
        .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
      )
      ;
  }
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

@Test
public void sample_48() throws Exception {
  ConnectableObservable<Long> upstream = Observable
      .interval(99, MILLISECONDS)
      .publish();
  upstream
      .debounce(100, MILLISECONDS)
      .timeout(1, SECONDS, upstream.take(1));
  upstream.connect();
}

相关文章

Observable类方法