io.reactivex.Flowable.share()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.5k)|赞(0)|评价(0)|浏览(176)

本文整理了Java中io.reactivex.Flowable.share()方法的一些代码示例,展示了Flowable.share()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.share()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:share

Flowable.share介绍

[英]Returns a new Publisher that multicasts (and shares a single subscription to) the original Publisher. As long as there is at least one Subscriber this Publisher will be subscribed and emitting data. When all subscribers have canceled it will cancel the source Publisher.

This is an alias for #publish(). ConnectableFlowable#refCount().

Backpressure: The operator honors backpressure and expects the source Publisher to honor backpressure as well. If this expectation is violated, the operator will signal a MissingBackpressureException to its Subscribers. Scheduler: share does not operate by default on a particular Scheduler.
[中]返回多播(并共享单个订阅)原始发布服务器的新发布服务器。只要至少有一个订阅服务器,该发布服务器将被订阅并发送数据。当所有订阅服务器都已取消时,它将取消源发布服务器。
这是#publish()的别名。可连接的可流动#refCount()。
背压:操作员接受背压,并希望源发布者也接受背压。如果违反此预期,操作员将向其订户发出MissingBackpressureException信号。计划程序:共享默认情况下不会在特定计划程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Integer> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).share()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doubleShare() {
  Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
    Flowable.just(it, it)
    .flatMapIterable(Functions.<Iterable<Integer>>identity())
    .share()
    .share()
    .count()
    .test()
    .assertResult(600L);
}

代码示例来源:origin: TeamNewPipe/NewPipe

/**
 * Part of subscription observation pipeline
 *
 * @see SubscriptionService#getSubscription()
 */
private Flowable<List<SubscriptionEntity>> getSubscriptionInfos() {
  return subscriptionTable().getAll()
      // Wait for a period of infrequent updates and return the latest update
      .debounce(SUBSCRIPTION_DEBOUNCE_INTERVAL, TimeUnit.MILLISECONDS)
      .share()            // Share allows multiple subscribers on the same observable
      .replay(1)          // Replay synchronizes subscribers to the last emitted result
      .autoConnect();
}

代码示例来源:origin: apache/incubator-gobblin

forkedStream = forkedStream.share();

代码示例来源:origin: ReactiveX/RxJava

sourceUnsubscribed.set(true);
}).share();

代码示例来源:origin: kaushikgopal/RxJava-Android-Samples

@Override
public void onStart() {
 super.onStart();
 _disposables = new CompositeDisposable();
 Flowable<Object> tapEventEmitter = _rxBus.asFlowable().share();
 _disposables.add(
   tapEventEmitter.subscribe(
     event -> {
      if (event instanceof RxBusDemoFragment.TapEvent) {
       _showTapText();
      }
     }));
 Flowable<Object> debouncedEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
 Flowable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEmitter);
 _disposables.add(
   debouncedBufferEmitter
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(
       taps -> {
        _showTapCount(taps.size());
       }));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void multiShare() {
  Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
  for (int i = 0; i < 5; i++) {
    Flowable<Integer> f = Flowable.just(it, it)
    .flatMapIterable(Functions.<Iterable<Integer>>identity());
    for (int j = 0; j < i; j++) {
      f = f.share();
    }
    f
    .count()
    .test()
    .withTag("Share: " + i)
    .assertResult(600L);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void multiShareHidden() {
  Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
  for (int i = 0; i < 5; i++) {
    Flowable<Integer> f = Flowable.just(it, it)
    .flatMapIterable(Functions.<Iterable<Integer>>identity())
    .hide();
    for (int j = 0; j < i; j++) {
      f = f.share();
    }
    f
    .count()
    .test()
    .withTag("Share: " + i)
    .assertResult(600L);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void boundaryFusion() {
  Flowable.range(1, 10000)
  .observeOn(Schedulers.single())
  .map(new Function<Integer, String>() {
    @Override
    public String apply(Integer t) throws Exception {
      String name = Thread.currentThread().getName();
      if (name.contains("RxSingleScheduler")) {
        return "RxSingleScheduler";
      }
      return name;
    }
  })
  .share()
  .observeOn(Schedulers.computation())
  .distinct()
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult("RxSingleScheduler");
}

代码示例来源:origin: ReactiveX/RxJava

.share()

代码示例来源:origin: JakeWharton/RxReplayingShare

@Override public Flowable<T> apply(Flowable<T> upstream) {
 LastSeen<T> lastSeen = new LastSeen<>();
 return new LastSeenFlowable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}

代码示例来源:origin: IvBaranov/RxBluetooth

/**
 * Observes byte from bluetooth's {@link InputStream}. Will be emitted per byte.
 *
 * @return RxJava Observable with {@link Byte}
 */
public Flowable<Byte> observeByteStream() {
 if (observeInputStream == null) {
  observeInputStream = Flowable.create(new FlowableOnSubscribe<Byte>() {
   @Override public void subscribe(final FlowableEmitter<Byte> subscriber) {
    while (!subscriber.isCancelled()) {
     try {
      subscriber.onNext((byte) inputStream.read());
     } catch (IOException e) {
      connected = false;
      subscriber.onError(new ConnectionClosedException("Can't read stream", e));
     } finally {
      if (!connected) {
       closeConnection();
      }
     }
    }
   }
  }, BackpressureStrategy.BUFFER).share();
 }
 return observeInputStream;
}

代码示例来源:origin: com.jakewharton.rx2/replaying-share

@Override public Flowable<T> apply(Flowable<T> upstream) {
 LastSeen<T> lastSeen = new LastSeen<>();
 return new LastSeenFlowable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}

代码示例来源:origin: ch.squaredesk.nova/http

@Override
public <T> Flowable<RpcInvocation<T>> requests(String destination, Class<T> targetType) {
  URL destinationAsLocalUrl;
  try {
    destinationAsLocalUrl = new URL("http", "localhost", destination);
  } catch (MalformedURLException e) {
    throw new RuntimeException(e);
  }
  Flowable retVal = mapDestinationToIncomingMessages
      .computeIfAbsent(destination, key -> {
        logger.info("Listening to requests on " + destination);
        Subject<RpcInvocation> stream = PublishSubject.create();
        stream = stream.toSerialized();
        NonBlockingHttpHandler httpHandler = new NonBlockingHttpHandler(destinationAsLocalUrl, messageTranscriber, targetType, stream);
        httpServer.getServerConfiguration().addHttpHandler(httpHandler, destination);
        return stream.toFlowable(BackpressureStrategy.BUFFER)
            .doFinally(() -> {
              mapDestinationToIncomingMessages.remove(destination);
              httpServer.getServerConfiguration().removeHttpHandler(httpHandler);
              logger.info("Stopped listening to requests on " + destination);
            })
            .share();
      });
  return retVal;
}

代码示例来源:origin: com.linkedin.gobblin/gobblin-api

forkedStream = forkedStream.share();

代码示例来源:origin: bsideup/graphql-java-reactive

Flowable<Change> changeFlow = Flowable.fromPublisher((Publisher<Change>) data)
    .share();

代码示例来源:origin: commonsguy/cw-androidarch

public RosterViewModel(Application ctxt) {
 super(ctxt);
 ObservableTransformer<Result, ViewState> toView=
  results -> (results.map(result -> {
   lastState=foldResultIntoState(lastState, result);
   return(lastState);
  }));
 Controller controller=new Controller(ctxt);
 states=LiveDataReactiveStreams
  .fromPublisher(controller.resultStream()
   .subscribeOn(Schedulers.single())
   .compose(toView)
   .cache()
   .toFlowable(BackpressureStrategy.LATEST)
   .share());
 controller.subscribeToActions(actionSubject);
 process(Action.load());
}

代码示例来源:origin: d4rken/RxShell

.share();

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  public void test2() {
    Flowable.just(Ix.range(0, 600))
    .flatMapIterable(x->x)
    .doOnNext(System.out::println)
    .share()
    .share()
    .count()
    .doOnEvent((a, b) -> {
      if (a != null) {
        System.out.println(a);
      } else {
        b.printStackTrace();
      }
    })
    .test()
    .assertResult(600L);
  }
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
public void test() {
  Flowable.just(Ix.range(0, 300), Ix.range(0, 300))
  .flatMapIterable(x->x)
  .doOnNext(System.out::println)
  .share()
  .share()
  .count()
  .doOnEvent((a, b) -> {
    if (a != null) {
      System.out.println(a);
    } else {
      b.printStackTrace();
    }
  })
  .test()
  .assertResult(600L);
}

相关文章

Flowable类方法