rx.Observable.publish()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.8k)|赞(0)|评价(0)|浏览(146)

本文整理了Java中rx.Observable.publish()方法的一些代码示例,展示了Observable.publish()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.publish()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:publish

Observable.publish介绍

[英]Returns a ConnectableObservable, which waits until its ConnectableObservable#connect method is called before it begins emitting items to those Observers that have subscribed to it.

Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: publish does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它将等待调用其ConnectableObservable#connect方法,然后开始向订阅它的观察者发送项目。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。计划程序:默认情况下,发布不会在特定计划程序上运行。

代码示例

代码示例来源:origin: apache/usergrid

  1. @Test
  2. @Category(ExperimentalTest.class )
  3. public void testConnectableObserver() throws InterruptedException {
  4. final int count = 10;
  5. final CountDownLatch latch = new CountDownLatch( count );
  6. final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
  7. //connect to our latch, which should run on it's own subscription
  8. //start our latch running
  9. connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
  10. final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
  11. //start the sequence
  12. connectedObservable.connect();
  13. final boolean completed = latch.await( 5, TimeUnit.SECONDS );
  14. assertTrue( "publish1 behaves as expected", completed );
  15. final int returnedCount = countObservable.toBlocking().last();
  16. assertEquals( "Counts the same", count, returnedCount );
  17. }

代码示例来源:origin: cn-ljb/rxjava_for_android

  1. @Override
  2. public void onStart() {
  3. super.onStart();
  4. //将普通的Observable转换为可连接的Observable
  5. ConnectableObservable<Object> publish = _rxBus.toObserverable().publish();
  6. publish.compose(this.bindToLifecycle())
  7. .subscribe(new Action1<Object>() { //一个一旦被触发就会显示TapText的监听者
  8. @Override
  9. public void call(Object event) {
  10. if (event instanceof RxBusDemoFragment.TapEvent) {
  11. _showTapText();
  12. }
  13. }
  14. });
  15. publish.compose(this.bindUntilEvent(FragmentEvent.DESTROY))
  16. .publish(new Func1<Observable<Object>, Observable<List<Object>>>() {//一个出发后缓存一秒内的点击数并显示的监听者
  17. @Override
  18. public Observable<List<Object>> call(Observable<Object> stream) {
  19. return stream.buffer(stream.debounce(1, TimeUnit.SECONDS)); //进行缓冲1秒,打包发送
  20. }
  21. }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
  22. @Override
  23. public void call(List<Object> taps) {
  24. _showTapCount(taps.size());
  25. }
  26. });
  27. publish.connect(); //可连接的Observable并不在订阅时触发,而需手动调用connect()方法
  28. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String args[]) {
  2. // debounce to the last value in each burst
  3. //intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
  4. /* The following will emit a buffered list as it is debounced */
  5. // first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe
  6. Observable<Integer> burstStream = intermittentBursts().take(20).publish().refCount();
  7. // then we get the debounced version
  8. Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS);
  9. // then the buffered one that uses the debounced stream to demark window start/stop
  10. Observable<List<Integer>> buffered = burstStream.buffer(debounced);
  11. // then we subscribe to the buffered stream so it does what we want
  12. buffered.toBlocking().forEach(System.out::println);
  13. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. @Override
  2. public void onStart() {
  3. super.onStart();
  4. _subscriptions = new CompositeSubscription();
  5. ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();
  6. _subscriptions//
  7. .add(tapEventEmitter.subscribe(new Action1<Object>() {
  8. @Override
  9. public void call(Object event) {
  10. if (event instanceof RxBusDemoFragment.TapEvent) {
  11. _showTapText();
  12. }
  13. }
  14. }));
  15. _subscriptions//
  16. .add(tapEventEmitter.publish(new Func1<Observable<Object>, Observable<List<Object>>>() {
  17. @Override
  18. public Observable<List<Object>> call(Observable<Object> stream) {
  19. return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));
  20. }
  21. }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
  22. @Override
  23. public void call(List<Object> taps) {
  24. _showTapCount(taps.size());
  25. }
  26. }));
  27. _subscriptions.add(tapEventEmitter.connect());
  28. }

代码示例来源:origin: spotify/mobius

  1. @Override
  2. public Observable<R> call(Observable<T> input) {
  3. return input.publish(
  4. new Func1<Observable<T>, Observable<R>>() {
  5. @Override
  6. public Observable<R> call(Observable<T> innerInput) {
  7. final List<Observable<R>> transformed = new ArrayList<>();
  8. for (Observable.Transformer<T, R> transformer : transformers) {
  9. transformed.add(innerInput.compose(transformer));
  10. }
  11. return Observable.merge(transformed);
  12. }
  13. });
  14. }
  15. }

代码示例来源:origin: com.github.cloudfoundry-community/snotel

  1. @Override
  2. public Observable<Envelope> open() {
  3. if (closed) {
  4. throw new IllegalStateException("The firehose client is closed.");
  5. }
  6. return Observable
  7. .create(onSubscribe)
  8. .publish()
  9. .refCount();
  10. }
  11. };

代码示例来源:origin: stephanenicolas/toothpick

  1. @Inject
  2. public RxPresenter() {
  3. timeObservable = Observable.interval(1, TimeUnit.SECONDS) //
  4. .subscribeOn(Schedulers.newThread()) //
  5. .observeOn(AndroidSchedulers.mainThread()) //
  6. .publish();
  7. connect = timeObservable.connect();
  8. }

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

  1. protected Observable<String> getServiceIdsFromDiscoveryClient() {
  2. logger.info("Discovering services for health");
  3. return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
  4. .map(id -> id.toLowerCase())
  5. .filter(id -> !id.equals(ZUUL))
  6. .doOnNext(s -> logger.debug("Service discovered: " + s))
  7. .doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
  8. .retry();
  9. }

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

  1. private Observable<String> getServicesFromDiscoveryClient() {
  2. logger.info("Discovering services");
  3. return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
  4. .map(String::toLowerCase)
  5. .doOnNext(s -> logger.debug("Service discovered: " + s))
  6. .doOnError(e -> {
  7. String error = "Error retrieving services: " + e.getMessage();
  8. logger.error(error);
  9. publisher.publishEvent(new SystemEvent(error, e));
  10. })
  11. .retry();
  12. }

代码示例来源:origin: ordina-jworks/microservices-dashboard-server

  1. protected Observable<String> getServiceIdsFromDiscoveryClient() {
  2. logger.info("Discovering services for mappings");
  3. return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
  4. .map(id -> id.toLowerCase())
  5. .filter(id -> !id.equals(ZUUL))
  6. .doOnNext(s -> logger.debug("Service discovered: " + s))
  7. .doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
  8. .retry();
  9. }

代码示例来源:origin: com.github.kmbulebu.nicknack/nicknack-core

  1. private ProviderServiceImpl(final Path providersDirectory, final XMLConfiguration configuration) {
  2. this.configuration = configuration;
  3. loader = new ProviderLoader(providersDirectory);
  4. eventStream = Observable.create(this).publish();
  5. eventStream.connect();
  6. // TODO Do something smart with the errors.
  7. initializeProviders();
  8. }

代码示例来源:origin: akarnokd/RxAgera

  1. public RxObservableAsAgera(rx.Observable<?> source) {
  2. if (source instanceof Subject) {
  3. // no need to publish/autoConnect a subject
  4. this.source = source.doAfterTerminate(this);
  5. } else {
  6. this.source = source.doAfterTerminate(this).publish().autoConnect();
  7. }
  8. this.updatables = new HashMap<>();
  9. }

代码示例来源:origin: dswarm/dswarm

  1. protected ConnectableObservable<GDMModel> transformResultModel(final Observable<org.dswarm.persistence.model.internal.Model> model) {
  2. final AtomicInteger resultCounter = new AtomicInteger(0);
  3. return model
  4. .onBackpressureBuffer(10000)
  5. .doOnSubscribe(() -> GDMModelTransformationFlow.LOG.debug("subscribed to results observable in transformation engine"))
  6. .doOnNext(resultObj -> {
  7. resultCounter.incrementAndGet();
  8. if (resultCounter.get() == 1) {
  9. GDMModelTransformationFlow.LOG.debug("received first result in transformation engine");
  10. }
  11. })
  12. .doOnCompleted(() -> GDMModelTransformationFlow.LOG.debug("received '{}' results in transformation engine overall", resultCounter.get()))
  13. .cast(org.dswarm.persistence.model.internal.gdm.GDMModel.class)
  14. .onBackpressureBuffer(10000)
  15. .publish();
  16. }

代码示例来源:origin: dswarm/dswarm

  1. protected ConnectableObservable<org.dswarm.persistence.model.internal.Model> doPostProcessingOfResultModel(final GDMModelReceiver writer, final Scheduler scheduler) {
  2. final ConnectableObservable<GDMModel> modelConnectableObservable = writer.getObservable()
  3. .observeOn(scheduler)
  4. .onBackpressureBuffer(10000)
  5. .publish();
  6. final ConnectableObservable<org.dswarm.persistence.model.internal.Model> model = doPostProcessingOfResultModel(modelConnectableObservable);
  7. modelConnectableObservable.connect();
  8. return model;
  9. }

代码示例来源:origin: org.springframework.cloud/spring-cloud-bus-turbine

  1. @Bean
  2. public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
  3. // multicast so multiple concurrent subscribers get the same stream
  4. Observable<Map<String, Object>> publishedStreams = StreamAggregator.aggregateGroupedStreams(hystrixSubject()
  5. .groupBy(data -> InstanceKey.create((String) data.get("instanceId"))))
  6. .doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing aggregation."))
  7. .doOnSubscribe(() -> log.info("BusTurbine => Starting aggregation"))
  8. .flatMap(o -> o).publish().refCount();
  9. int port = new Integer(env.getProperty("server.port", "8989"));
  10. HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(port, (request, response) -> {
  11. log.info("BusTurbine => SSE Request Received");
  12. response.getHeaders().setHeader("Content-Type", "text/event-stream");
  13. return publishedStreams
  14. .doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing RxNetty server connection"))
  15. .flatMap(data -> response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data))));
  16. }, sseServerConfigurator());
  17. return httpServer;
  18. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_186() throws Exception {
  3. Observable<Status> observable = status();
  4. Observable<Status> lazy = observable.publish().refCount();
  5. //...
  6. System.out.println("Before subscribers");
  7. Subscription sub1 = lazy.subscribe();
  8. System.out.println("Subscribed 1");
  9. Subscription sub2 = lazy.subscribe();
  10. System.out.println("Subscribed 2");
  11. sub1.unsubscribe();
  12. System.out.println("Unsubscribed 1");
  13. sub2.unsubscribe();
  14. System.out.println("Unsubscribed 2");
  15. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. public static void main(String[] args) {
  2. Observable.range(1, 10)
  3. .doOnSubscribe(() -> System.out.println("Subscribed"))
  4. .publish(o -> Observable.zip(o.map(v -> v * 10), o.map(v -> v * 20), (a, b) -> a + "-" + b))
  5. .subscribe(System.out::println, Throwable::printStackTrace);
  6. }
  7. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_206() throws Exception {
  3. final Observable<Status> tweets = status();
  4. ConnectableObservable<Status> published = tweets.publish();
  5. published.connect();
  6. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
  2. return f ->
  3. f.publish(g ->
  4. g.take(1)
  5. .concatWith(
  6. g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
  7. .take(1)
  8. .ignoreElements()
  9. )
  10. .repeatWhen(h -> h.takeUntil(g.ignoreElements()))
  11. )
  12. ;
  13. }
  14. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. @Test
  2. public void sample_48() throws Exception {
  3. ConnectableObservable<Long> upstream = Observable
  4. .interval(99, MILLISECONDS)
  5. .publish();
  6. upstream
  7. .debounce(100, MILLISECONDS)
  8. .timeout(1, SECONDS, upstream.take(1));
  9. upstream.connect();
  10. }

相关文章

Observable类方法