rx.Observable.share()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中rx.Observable.share()方法的一些代码示例,展示了Observable.share()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.share()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:share

Observable.share介绍

[英]Returns a new Observable that multicasts (shares) the original Observable. As long as there is more than one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.

This is an alias for #publish(). ConnectableObservable#refCount().

Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: share does not operate by default on a particular Scheduler.
[中]返回多播(共享)原始可观察对象的新可观察对象。只要有一个以上的订户,这个可观察的将被订阅并发送数据。当所有订户都取消订阅时,它将从可观察的源取消订阅。
这是#publish()的别名。可连接可观察#refCount()。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。调度程序:共享默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: PipelineAI/pipeline

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                  final Func2<Histogram, Event, Histogram> addValuesToBucket) {
  final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
  for (int i = 0; i < numBuckets; i++) {
    emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
  }
  final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
    @Override
    public Observable<Histogram> call(Observable<Event> bucket) {
      return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
    }
  };
  rollingDistributionStream = stream
      .observe()
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
      .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
      .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
      .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
      .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
      .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllUtilization)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllConfig)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                    final Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
  Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
      return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
  };
  this.sourceStream = bucketedStream      //stream broken up into buckets
      .window(numBuckets, 1)          //emit overlapping windows of buckets
      .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: PipelineAI/pipeline

private HystrixDashboardStream(int delayInMs) {
  this.delayInMs = delayInMs;
  this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
      .map(new Func1<Long, DashboardData>() {
        @Override
        public DashboardData call(Long timestamp) {
          return new DashboardData(
              HystrixCommandMetrics.getInstances(),
              HystrixThreadPoolMetrics.getInstances(),
              HystrixCollapserMetrics.getInstances()
          );
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                     Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                     Func2<Output, Bucket, Output> reduceBucket) {
  super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
  this.sourceStream = bucketedStream
      .scan(getEmptyOutputValue(), reduceBucket)
      .skip(numBuckets)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()                        //multiple subscribers should get same data
      .onBackpressureDrop();          //if there are slow consumers, data should not buffer
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@Override
public void onStart() {
  super.onStart();
  _subscriptions = new CompositeSubscription();
  Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
  _subscriptions//
     .add(tapEventEmitter.subscribe(new Action1<Object>() {
       @Override
       public void call(Object event) {
         if (event instanceof RxBusDemoFragment.TapEvent) {
           _showTapText();
         }
       }
     }));
  Observable<Object> debouncedEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
  Observable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEmitter);
  _subscriptions//
     .add(debouncedBufferEmitter//
        .observeOn(AndroidSchedulers.mainThread())//
        .subscribe(new Action1<List<Object>>() {
          @Override
          public void call(List<Object> taps) {
            _showTapCount(taps.size());
          }
        }));
}

代码示例来源:origin: HubSpot/Singularity

final Observable<Event> events = unicastEvents.share();

代码示例来源:origin: com.netflix.eureka/eureka2-server

protected Observable<InstanceInfo.Builder> resolveMutable() {
    return resultObservable.share();
  }
}

代码示例来源:origin: hanks-zyh/RxSerach

public static Observable<Boolean> fromConnectivityManager(Context context) {
  return Observable.create(new BroadcastObservable(context)).share();
}

代码示例来源:origin: com.netflix.hystrix/hystrix-core

protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
  final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
  for (int i = 0; i < numBuckets; i++) {
    emptyRollingMaxBuckets.add(0);
  }
  rollingMaxStream = inputEventStream
      .observe()
      .map(getConcurrencyCountFromEvent)
      .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
      .flatMap(reduceStreamToMax)
      .startWith(emptyRollingMaxBuckets)
      .window(numBuckets, 1)
      .flatMap(reduceStreamToMax)
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: akarnokd/akarnokd-misc

public static void main(String[] args) {
    System.out.println(Observable.interval(1, 1, TimeUnit.MILLISECONDS)
    .take(2000)
    .share()
    .onBackpressureLatest()
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation(), 16)
    .doOnNext(System.out::println)
    .toList()
    .toBlocking()
    .last().size());
  }
}

代码示例来源:origin: com.microsoft.azure/spring-integration-eventhub

@Override
  public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
    Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);

    subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
      this.register(destination, consumerGroup,
          new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
              getMessageConverter()));
      subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
    }).share());

    return subjectByNameAndGroup.get(nameAndConsumerGroup);
  }
}

代码示例来源:origin: com.microsoft.azure/spring-integration-eventhub

@Override
public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
  Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
  subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
    this.register(destination, consumerGroup,
        new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
            getMessageConverter()));
    subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
  }).share());
  return subjectByNameAndGroup.get(nameAndConsumerGroup);
}

代码示例来源:origin: Microsoft/spring-cloud-azure

@Override
public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
  Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
  subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
    this.register(destination, consumerGroup,
        new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
            getMessageConverter()));
    subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
  }).share());
  return subjectByNameAndGroup.get(nameAndConsumerGroup);
}

代码示例来源:origin: Microsoft/spring-cloud-azure

@Override
  public Observable<Message<?>> subscribe(String destination, String consumerGroup, Class<?> messagePayloadType) {
    Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);

    subjectByNameAndGroup.computeIfAbsent(nameAndConsumerGroup, k -> Observable.<Message<?>>create(subscriber -> {
      this.register(destination, consumerGroup,
          new EventHubProcessor(subscriber::onNext, messagePayloadType, getCheckpointConfig(),
              getMessageConverter()));
      subscriber.add(Subscriptions.create(() -> unregister(destination, consumerGroup)));
    }).share());

    return subjectByNameAndGroup.get(nameAndConsumerGroup);
  }
}

代码示例来源:origin: au.gov.amsa.risky/streams

public static Observable<String> from(Func0<Socket> socketCreator, long quietTimeoutMs,
    long reconnectDelayMs, Charset charset, Scheduler scheduler) {
  return strings(socketCreator, charset) //
      // additional timeout appears to be necessary for certain use
      // cases like when the server side does not close the socket
      .timeout(quietTimeoutMs + 100, TimeUnit.MILLISECONDS) //
      .subscribeOn(scheduler) //
      // if any exception occurs retry
      .retryWhen(RetryWhen //
          .delay(reconnectDelayMs, TimeUnit.MILLISECONDS) //
          .build()) //
      // all subscribers use the same stream
      .share();
}

代码示例来源:origin: com.netflix.eureka/eureka2-server

public CachingSelfInfoResolver(SelfInfoResolver delegate) {
  this.delegateObservable = delegate.resolve();
  this.cachingSubject = BehaviorSubject.create();
  control = Observable.<InstanceInfo>never()
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          subscription = delegateObservable.subscribe(cachingSubject);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          if (subscription != null) {
            subscription.unsubscribe();
          }
        }
      })
      .share();
}

代码示例来源:origin: nurkiewicz/rxjava-book-examples

Observable<PGNotification> observe(String channel, long pollingPeriod) {
  return Observable.<PGNotification>create(subscriber -> {
    try {
      Connection connection = DriverManager
          .getConnection("jdbc:postgresql:db");
      subscriber.add(Subscriptions.create(() ->
          closeQuietly(connection)));
      listenOn(connection, channel);
      Jdbc42Connection pgConn = (Jdbc42Connection) connection;
      pollForNotifications(pollingPeriod, pgConn)
          .subscribe(Subscribers.wrap(subscriber));
    } catch (Exception e) {
      subscriber.onError(e);
    }
  }).share();
}

相关文章

Observable类方法