io.reactivex.Observable.share()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.4k)|赞(0)|评价(0)|浏览(181)

本文整理了Java中io.reactivex.Observable.share()方法的一些代码示例,展示了Observable.share()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.share()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:share

Observable.share介绍

[英]Returns a new ObservableSource that multicasts (and shares a single subscription to) the original ObservableSource. As long as there is at least one Observer this ObservableSource will be subscribed and emitting data. When all subscribers have disposed it will dispose the source ObservableSource.

This is an alias for #publish(). ConnectableObservable#refCount().

Scheduler: share does not operate by default on a particular Scheduler.
[中]返回一个新的ObservableSource,该ObservableSource多播(并共享一个订阅)原始ObservableSource。只要至少有一个观察者,这个可观察资源就会被订阅并发送数据。当所有订阅者都处理完后,它将处理源ObservableSource。
这是#publish()的别名。可连接可观察#refCount()。
调度程序:共享默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: trello/RxLifecycle

  1. /**
  2. * Binds the given source to a lifecycle.
  3. * <p>
  4. * This method determines (based on the lifecycle sequence itself) when the source
  5. * should stop emitting items. It uses the provided correspondingEvents function to determine
  6. * when to unsubscribe.
  7. * <p>
  8. * Note that this is an advanced usage of the library and should generally be used only if you
  9. * really know what you're doing with a given lifecycle.
  10. *
  11. * @param lifecycle the lifecycle sequence
  12. * @param correspondingEvents a function which tells the source when to unsubscribe
  13. * @return a reusable {@link LifecycleTransformer} that unsubscribes the source during the Fragment lifecycle
  14. */
  15. @Nonnull
  16. @CheckReturnValue
  17. public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
  18. @Nonnull final Function<R, R> correspondingEvents) {
  19. checkNotNull(lifecycle, "lifecycle == null");
  20. checkNotNull(correspondingEvents, "correspondingEvents == null");
  21. return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
  22. }

代码示例来源:origin: alibaba/Tangram-Android

  1. public RxTimer(long interval) {
  2. this.mInterval = interval;
  3. this.mStatus = TimerStatus.Waiting;
  4. this.mIntervalObservable = Observable
  5. .interval(0, this.mInterval, TimeUnit.MILLISECONDS)
  6. .doOnSubscribe(new Consumer<Disposable>() {
  7. @Override
  8. public void accept(Disposable disposable) throws Exception {
  9. mStatus = TimerStatus.Running;
  10. Log.d("RxTimerSupportTest", "accept " + disposable + " status " + mStatus);
  11. }
  12. })
  13. .doOnDispose(new Action() {
  14. @Override
  15. public void run() throws Exception {
  16. mStatus = TimerStatus.Paused;
  17. Log.d("RxTimerSupportTest", "on dispose " + " status " + mStatus);
  18. }
  19. })
  20. .doOnTerminate(new Action() {
  21. @Override
  22. public void run() throws Exception {
  23. mStatus = TimerStatus.Stopped;
  24. Log.d("RxTimerSupportTest", "on terminate " + " status " + mStatus);
  25. }
  26. })
  27. .share();
  28. }

代码示例来源:origin: alibaba/Tangram-Android

  1. public static <T, E> LifecycleTransformer<T> bindToLifeCycle(Observable<E> lifecycle,
  2. final Function<E, E> correspondingEvents) {
  3. Observable<E> lifecycleCopy = lifecycle.share();
  4. return new LifecycleTransformer<>(Observable.combineLatest(lifecycle.take(1).map(correspondingEvents),
  5. lifecycleCopy.skip(1),
  6. new BiFunction<E, E, Boolean>() {
  7. @Override
  8. public Boolean apply(E e, E e2) throws Exception {
  9. return e.equals(e2);
  10. }
  11. }).filter(new Predicate<Boolean>() {
  12. @Override
  13. public boolean test(Boolean cmpResult) throws Exception {
  14. return cmpResult;
  15. }
  16. }));
  17. }

代码示例来源:origin: Polidea/RxAndroidBle

  1. private Observable<RxBleScanResult> createScanOperationApi18(@Nullable final UUID[] filterServiceUUIDs) {
  2. final Set<UUID> filteredUUIDs = uuidUtil.toDistinctSet(filterServiceUUIDs);
  3. final LegacyScanOperation
  4. scanOperation = new LegacyScanOperation(filterServiceUUIDs, rxBleAdapterWrapper, uuidUtil);
  5. return operationQueue.queue(scanOperation)
  6. .doFinally(new Action() {
  7. @Override
  8. public void run() throws Exception {
  9. synchronized (queuedScanOperations) {
  10. queuedScanOperations.remove(filteredUUIDs);
  11. }
  12. }
  13. })
  14. .mergeWith(this.<RxBleInternalScanResultLegacy>bluetoothAdapterOffExceptionObservable())
  15. .map(new Function<RxBleInternalScanResultLegacy, RxBleScanResult>() {
  16. @Override
  17. public RxBleScanResult apply(RxBleInternalScanResultLegacy scanResult) {
  18. return convertToPublicScanResult(scanResult);
  19. }
  20. })
  21. .share();
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. sourceUnsubscribed.set(true);
  2. }).share();

代码示例来源:origin: ReactiveX/RxJava

  1. .share()

代码示例来源:origin: Polidea/RxAndroidBle

  1. final Observable<Boolean> sharedConnectButtonClicks = activatedClicksObservable(connectButton).share();
  2. final Observable<Boolean> sharedNotifyButtonClicks = activatedClicksObservable(notifyButton).share();
  3. final Observable<Boolean> sharedIndicateButtonClicks = activatedClicksObservable(indicateButton).share();

代码示例来源:origin: f2prateek/rx-preferences

  1. private RxSharedPreferences(final SharedPreferences preferences) {
  2. this.preferences = preferences;
  3. this.keyChanges = Observable.create(new ObservableOnSubscribe<String>() {
  4. @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
  5. final OnSharedPreferenceChangeListener listener = new OnSharedPreferenceChangeListener() {
  6. @Override
  7. public void onSharedPreferenceChanged(SharedPreferences preferences, String key) {
  8. emitter.onNext(key);
  9. }
  10. };
  11. emitter.setCancellable(new Cancellable() {
  12. @Override public void cancel() throws Exception {
  13. preferences.unregisterOnSharedPreferenceChangeListener(listener);
  14. }
  15. });
  16. preferences.registerOnSharedPreferenceChangeListener(listener);
  17. }
  18. }).share();
  19. }

代码示例来源:origin: JakeWharton/RxReplayingShare

  1. @Override public Observable<T> apply(Observable<T> upstream) {
  2. LastSeen<T> lastSeen = new LastSeen<>();
  3. return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
  4. }

代码示例来源:origin: bitrich-info/xchange-stream

  1. public Observable<BitmexWebSocketTransaction> subscribeBitmexChannel(String channelName) {
  2. return subscribeChannel(channelName).map(s -> {
  3. BitmexWebSocketTransaction transaction = objectMapper.treeToValue(s, BitmexWebSocketTransaction.class);
  4. return transaction;
  5. })
  6. .share();
  7. }

代码示例来源:origin: manas-chaudhari/android-mvvm

  1. public ViewPagerAdapter(@NonNull Observable<List<ViewModel>> viewModels, @NonNull ViewProvider viewProvider, @NonNull ViewModelBinder binder) {
  2. source = viewModels
  3. .observeOn(AndroidSchedulers.mainThread())
  4. .doOnNext(new Consumer<List<ViewModel>>() {
  5. @Override
  6. public void accept(List<ViewModel> viewModels) throws Exception {
  7. latestViewModels = (viewModels != null) ? viewModels : new ArrayList<ViewModel>();
  8. notifyDataSetChanged();
  9. }
  10. })
  11. .share();
  12. this.viewProvider = viewProvider;
  13. this.binder = binder;
  14. }

代码示例来源:origin: bitrich-info/xchange-stream

  1. @Override
  2. public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
  3. Observable<PoloniexWebSocketTradeEvent> subscribedTrades = service.subscribeCurrencyPairChannel(currencyPair)
  4. .filter(s -> s.getEventType().equals("t"))
  5. .map(s -> (PoloniexWebSocketTradeEvent) s).share();
  6. return subscribedTrades
  7. .map(s -> adaptPoloniexPublicTrade(s.toPoloniexPublicTrade(currencyPair), currencyPair));
  8. }
  9. }

代码示例来源:origin: bitrich-info/xchange-stream

  1. public Observable<JsonNode> subscribeChannel(String channelName) {
  2. LOG.info("Subscribing to channel {}.", channelName);
  3. return Observable.<JsonNode>create(e -> {
  4. if (!subscriptions.containsKey(channelName)) {
  5. subscriptions.put(channelName, e);
  6. pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
  7. LOG.debug("Subscribe channel: {}", channelName);
  8. }
  9. }).doOnDispose(() -> {
  10. LOG.debug("Unsubscribe channel: {}", channelName);
  11. pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
  12. }).share();
  13. }

代码示例来源:origin: info.bitrich.xchange-stream/service-pubnub

  1. public Observable<JsonNode> subscribeChannel(String channelName) {
  2. LOG.info("Subscribing to channel {}.", channelName);
  3. return Observable.<JsonNode>create(e -> {
  4. if (!subscriptions.containsKey(channelName)) {
  5. subscriptions.put(channelName, e);
  6. pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
  7. LOG.debug("Subscribe channel: {}", channelName);
  8. }
  9. }).doOnDispose(() -> {
  10. LOG.debug("Unsubscribe channel: {}", channelName);
  11. pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
  12. }).share();
  13. }

代码示例来源:origin: bitrich-info/xchange-stream

  1. public Observable<PoloniexWebSocketEvent> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
  2. String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
  3. return subscribeChannel(channelName)
  4. .flatMapIterable(s -> {
  5. PoloniexWebSocketEventsTransaction transaction = objectMapper.treeToValue(s, PoloniexWebSocketEventsTransaction.class);
  6. return Arrays.asList(transaction.getEvents());
  7. }).share();
  8. }

代码示例来源:origin: graphql-java/graphql-java-subscription-example

  1. public StockTickerPublisher() {
  2. Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
  3. ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
  4. executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
  5. });
  6. ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
  7. connectableObservable.connect();
  8. publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
  9. }

代码示例来源:origin: com.jakewharton.rx2/replaying-share

  1. @Override public Observable<T> apply(Observable<T> upstream) {
  2. LastSeen<T> lastSeen = new LastSeen<>();
  3. return new LastSeenObservable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
  4. }

代码示例来源:origin: manas-chaudhari/android-mvvm

  1. private ReadOnlyField(@NonNull Observable<T> source) {
  2. super();
  3. this.source = source.doOnNext(new Consumer<T>() {
  4. @Override
  5. public void accept(T t) throws Exception {
  6. ReadOnlyField.super.set(t);
  7. }
  8. }).doOnError(new Consumer<Throwable>() {
  9. @Override
  10. public void accept(Throwable throwable) throws Exception {
  11. Log.e("ReadOnlyField", "onError in source observable", throwable);
  12. }
  13. }).onErrorResumeNext(Observable.<T>empty()).share();
  14. }

代码示例来源:origin: networknt/light-example-4j

  1. public ChannelPublisher() {
  2. Observable<MessageAddedEvent> messageAddedEventObservable = Observable.create(messageAddedEventObservableEmitter -> {
  3. this.emitter = messageAddedEventObservableEmitter;
  4. });
  5. ConnectableObservable<MessageAddedEvent> connectableObservable = messageAddedEventObservable.share().publish();
  6. connectableObservable.connect();
  7. publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
  8. }

代码示例来源:origin: akarnokd/akarnokd-misc

  1. @Test
  2. public void test() {
  3. Observable<Integer> myObservable = Observable.just(1)
  4. .<Integer>flatMap(i -> {
  5. throw new IllegalStateException();
  6. }).share();
  7. myObservable
  8. .zipWith(myObservable, Pair::of)
  9. .subscribe(pair -> {
  10. //ignore
  11. }, throwable -> {
  12. //ignore
  13. });
  14. }
  15. }

相关文章

Observable类方法