rx.Observable.share()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(155)

本文整理了Java中rx.Observable.share()方法的一些代码示例,展示了Observable.share()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.share()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:share

Observable.share介绍

[英]Returns a new Observable that multicasts (shares) the original Observable. As long as there is more than one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.

This is an alias for #publish(). ConnectableObservable#refCount().

Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: share does not operate by default on a particular Scheduler.
[中]返回多播(共享)原始可观察对象的新可观察对象。只要有一个以上的订户,这个可观察的将被订阅并发送数据。当所有订户都取消订阅时,它将从可观察的源取消订阅。
这是#publish()的别名。可连接可观察#refCount()。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。调度程序:共享默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
  2. final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  3. final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  4. for (int i = 0; i < numBuckets; i++) {
  5. emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  6. }
  7. final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
  8. @Override
  9. public Observable<Histogram> call(Observable<Event> bucket) {
  10. return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
  11. }
  12. };
  13. rollingDistributionStream = stream
  14. .observe()
  15. .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
  16. .flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
  17. .startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
  18. .window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
  19. .flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
  20. .map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
  21. .share()
  22. .onBackpressureDrop();
  23. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixUtilizationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllUtilization)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixConfigurationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllConfig)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
  2. final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
  3. final Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  5. Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
  6. @Override
  7. public Observable<Output> call(Observable<Bucket> window) {
  8. return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
  9. }
  10. };
  11. this.sourceStream = bucketedStream //stream broken up into buckets
  12. .window(numBuckets, 1) //emit overlapping windows of buckets
  13. .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share() //multiple subscribers should get same data
  27. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  28. }

代码示例来源:origin: PipelineAI/pipeline

  1. private HystrixDashboardStream(int delayInMs) {
  2. this.delayInMs = delayInMs;
  3. this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
  4. .map(new Func1<Long, DashboardData>() {
  5. @Override
  6. public DashboardData call(Long timestamp) {
  7. return new DashboardData(
  8. HystrixCommandMetrics.getInstances(),
  9. HystrixThreadPoolMetrics.getInstances(),
  10. HystrixCollapserMetrics.getInstances()
  11. );
  12. }
  13. })
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share()
  27. .onBackpressureDrop();
  28. }

代码示例来源:origin: PipelineAI/pipeline

  1. protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
  2. Func2<Bucket, Event, Bucket> reduceCommandCompletion,
  3. Func2<Output, Bucket, Output> reduceBucket) {
  4. super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  5. this.sourceStream = bucketedStream
  6. .scan(getEmptyOutputValue(), reduceBucket)
  7. .skip(numBuckets)
  8. .doOnSubscribe(new Action0() {
  9. @Override
  10. public void call() {
  11. isSourceCurrentlySubscribed.set(true);
  12. }
  13. })
  14. .doOnUnsubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(false);
  18. }
  19. })
  20. .share() //multiple subscribers should get same data
  21. .onBackpressureDrop(); //if there are slow consumers, data should not buffer
  22. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. @Override
  2. public void onStart() {
  3. super.onStart();
  4. _subscriptions = new CompositeSubscription();
  5. Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
  6. _subscriptions//
  7. .add(tapEventEmitter.subscribe(new Action1<Object>() {
  8. @Override
  9. public void call(Object event) {
  10. if (event instanceof RxBusDemoFragment.TapEvent) {
  11. _showTapText();
  12. }
  13. }
  14. }));
  15. Observable<Object> debouncedEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
  16. Observable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEmitter);
  17. _subscriptions//
  18. .add(debouncedBufferEmitter//
  19. .observeOn(AndroidSchedulers.mainThread())//
  20. .subscribe(new Action1<List<Object>>() {
  21. @Override
  22. public void call(List<Object> taps) {
  23. _showTapCount(taps.size());
  24. }
  25. }));
  26. }

代码示例来源:origin: HubSpot/Singularity

  1. final Observable<Event> events = unicastEvents.share();

代码示例来源:origin: com.netflix.eureka/eureka2-server

  1. protected Observable<InstanceInfo.Builder> resolveMutable() {
  2. return resultObservable.share();
  3. }
  4. }

代码示例来源:origin: hanks-zyh/RxSerach

  1. public static Observable<Boolean> fromConnectivityManager(Context context) {
  2. return Observable.create(new BroadcastObservable(context)).share();
  3. }

代码示例来源:origin: com.netflix.hystrix/hystrix-core

  1. protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  2. final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  3. for (int i = 0; i < numBuckets; i++) {
  4. emptyRollingMaxBuckets.add(0);
  5. }
  6. rollingMaxStream = inputEventStream
  7. .observe()
  8. .map(getConcurrencyCountFromEvent)
  9. .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
  10. .flatMap(reduceStreamToMax)
  11. .startWith(emptyRollingMaxBuckets)
  12. .window(numBuckets, 1)
  13. .flatMap(reduceStreamToMax)
  14. .share()
  15. .onBackpressureDrop();
  16. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. public static void main(String[] args) {
  2. System.out.println(Observable.interval(1, 1, TimeUnit.MILLISECONDS)
  3. .take(2000)
  4. .share()
  5. .onBackpressureLatest()
  6. .subscribeOn(Schedulers.io())
  7. .observeOn(Schedulers.computation(), 16)
  8. .doOnNext(System.out::println)
  9. .toList()
  10. .toBlocking()
  11. .last().size());
  12. }
  13. }

代码示例来源:origin: com.microsoft.azure/spring-integration-eventhub

  1. @Override
  2. public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
  3. Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
  4. subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
  5. this.register(destination, consumerGroup,
  6. new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
  7. getMessageConverter()));
  8. subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
  9. }).share());
  10. return subjectByNameAndGroup.get(nameAndConsumerGroup);
  11. }
  12. }

代码示例来源:origin: com.microsoft.azure/spring-integration-eventhub

  1. @Override
  2. public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
  3. Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
  4. subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
  5. this.register(destination, consumerGroup,
  6. new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
  7. getMessageConverter()));
  8. subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
  9. }).share());
  10. return subjectByNameAndGroup.get(nameAndConsumerGroup);
  11. }

代码示例来源:origin: Microsoft/spring-cloud-azure

  1. @Override
  2. public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
  3. Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
  4. subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
  5. this.register(destination, consumerGroup,
  6. new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
  7. getMessageConverter()));
  8. subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
  9. }).share());
  10. return subjectByNameAndGroup.get(nameAndConsumerGroup);
  11. }

代码示例来源:origin: Microsoft/spring-cloud-azure

  1. @Override
  2. public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
  3. Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
  4. subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
  5. this.register(destination, consumerGroup,
  6. new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
  7. getMessageConverter()));
  8. subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
  9. }).share());
  10. return subjectByNameAndGroup.get(nameAndConsumerGroup);
  11. }
  12. }

代码示例来源:origin: au.gov.amsa.risky/streams

  1. public static Observable<String> from(Func0<Socket> socketCreator, long quietTimeoutMs,
  2. long reconnectDelayMs, Charset charset, Scheduler scheduler) {
  3. return strings(socketCreator, charset) //
  4. // additional timeout appears to be necessary for certain use
  5. // cases like when the server side does not close the socket
  6. .timeout(quietTimeoutMs + 100, TimeUnit.MILLISECONDS) //
  7. .subscribeOn(scheduler) //
  8. // if any exception occurs retry
  9. .retryWhen(RetryWhen //
  10. .delay(reconnectDelayMs, TimeUnit.MILLISECONDS) //
  11. .build()) //
  12. // all subscribers use the same stream
  13. .share();
  14. }

代码示例来源:origin: com.netflix.eureka/eureka2-server

  1. public CachingSelfInfoResolver(SelfInfoResolver delegate) {
  2. this.delegateObservable = delegate.resolve();
  3. this.cachingSubject = BehaviorSubject.create();
  4. control = Observable.<InstanceInfo>never()
  5. .doOnSubscribe(new Action0() {
  6. @Override
  7. public void call() {
  8. subscription = delegateObservable.subscribe(cachingSubject);
  9. }
  10. })
  11. .doOnUnsubscribe(new Action0() {
  12. @Override
  13. public void call() {
  14. if (subscription != null) {
  15. subscription.unsubscribe();
  16. }
  17. }
  18. })
  19. .share();
  20. }

代码示例来源:origin: nurkiewicz/rxjava-book-examples

  1. Observable<PGNotification> observe(String channel, long pollingPeriod) {
  2. return Observable.<PGNotification>create(subscriber -> {
  3. try {
  4. Connection connection = DriverManager
  5. .getConnection("jdbc:postgresql:db");
  6. subscriber.add(Subscriptions.create(() ->
  7. closeQuietly(connection)));
  8. listenOn(connection, channel);
  9. Jdbc42Connection pgConn = (Jdbc42Connection) connection;
  10. pollForNotifications(pollingPeriod, pgConn)
  11. .subscribe(Subscribers.wrap(subscriber));
  12. } catch (Exception e) {
  13. subscriber.onError(e);
  14. }
  15. }).share();
  16. }

相关文章

Observable类方法