本文整理了Java中io.reactivex.Flowable.share()
方法的一些代码示例,展示了Flowable.share()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.share()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:share
[英]Returns a new Publisher that multicasts (and shares a single subscription to) the original Publisher. As long as there is at least one Subscriber this Publisher will be subscribed and emitting data. When all subscribers have canceled it will cancel the source Publisher.
This is an alias for #publish(). ConnectableFlowable#refCount().
Backpressure: The operator honors backpressure and expects the source Publisher to honor backpressure as well. If this expectation is violated, the operator will signal a MissingBackpressureException to its Subscribers. Scheduler: share does not operate by default on a particular Scheduler.
[中]返回多播(并共享单个订阅)原始发布服务器的新发布服务器。只要至少有一个订阅服务器,该发布服务器将被订阅并发送数据。当所有订阅服务器都已取消时,它将取消源发布服务器。
这是#publish()的别名。可连接的可流动#refCount()。
背压:操作员接受背压,并希望源发布者也接受背压。如果违反此预期,操作员将向其订户发出MissingBackpressureException信号。计划程序:共享默认情况下不会在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Integer> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).share()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void doubleShare() {
Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
Flowable.just(it, it)
.flatMapIterable(Functions.<Iterable<Integer>>identity())
.share()
.share()
.count()
.test()
.assertResult(600L);
}
代码示例来源:origin: TeamNewPipe/NewPipe
/**
* Part of subscription observation pipeline
*
* @see SubscriptionService#getSubscription()
*/
private Flowable<List<SubscriptionEntity>> getSubscriptionInfos() {
return subscriptionTable().getAll()
// Wait for a period of infrequent updates and return the latest update
.debounce(SUBSCRIPTION_DEBOUNCE_INTERVAL, TimeUnit.MILLISECONDS)
.share() // Share allows multiple subscribers on the same observable
.replay(1) // Replay synchronizes subscribers to the last emitted result
.autoConnect();
}
代码示例来源:origin: apache/incubator-gobblin
forkedStream = forkedStream.share();
代码示例来源:origin: ReactiveX/RxJava
sourceUnsubscribed.set(true);
}).share();
代码示例来源:origin: kaushikgopal/RxJava-Android-Samples
@Override
public void onStart() {
super.onStart();
_disposables = new CompositeDisposable();
Flowable<Object> tapEventEmitter = _rxBus.asFlowable().share();
_disposables.add(
tapEventEmitter.subscribe(
event -> {
if (event instanceof RxBusDemoFragment.TapEvent) {
_showTapText();
}
}));
Flowable<Object> debouncedEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
Flowable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEmitter);
_disposables.add(
debouncedBufferEmitter
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
taps -> {
_showTapCount(taps.size());
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void multiShare() {
Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
for (int i = 0; i < 5; i++) {
Flowable<Integer> f = Flowable.just(it, it)
.flatMapIterable(Functions.<Iterable<Integer>>identity());
for (int j = 0; j < i; j++) {
f = f.share();
}
f
.count()
.test()
.withTag("Share: " + i)
.assertResult(600L);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void multiShareHidden() {
Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
for (int i = 0; i < 5; i++) {
Flowable<Integer> f = Flowable.just(it, it)
.flatMapIterable(Functions.<Iterable<Integer>>identity())
.hide();
for (int j = 0; j < i; j++) {
f = f.share();
}
f
.count()
.test()
.withTag("Share: " + i)
.assertResult(600L);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
.share()
.observeOn(Schedulers.computation())
.distinct()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult("RxSingleScheduler");
}
代码示例来源:origin: ReactiveX/RxJava
.share()
代码示例来源:origin: JakeWharton/RxReplayingShare
@Override public Flowable<T> apply(Flowable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
return new LastSeenFlowable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}
代码示例来源:origin: IvBaranov/RxBluetooth
/**
* Observes byte from bluetooth's {@link InputStream}. Will be emitted per byte.
*
* @return RxJava Observable with {@link Byte}
*/
public Flowable<Byte> observeByteStream() {
if (observeInputStream == null) {
observeInputStream = Flowable.create(new FlowableOnSubscribe<Byte>() {
@Override public void subscribe(final FlowableEmitter<Byte> subscriber) {
while (!subscriber.isCancelled()) {
try {
subscriber.onNext((byte) inputStream.read());
} catch (IOException e) {
connected = false;
subscriber.onError(new ConnectionClosedException("Can't read stream", e));
} finally {
if (!connected) {
closeConnection();
}
}
}
}
}, BackpressureStrategy.BUFFER).share();
}
return observeInputStream;
}
代码示例来源:origin: com.jakewharton.rx2/replaying-share
@Override public Flowable<T> apply(Flowable<T> upstream) {
LastSeen<T> lastSeen = new LastSeen<>();
return new LastSeenFlowable<>(upstream.doOnEach(lastSeen).share(), lastSeen);
}
代码示例来源:origin: ch.squaredesk.nova/http
@Override
public <T> Flowable<RpcInvocation<T>> requests(String destination, Class<T> targetType) {
URL destinationAsLocalUrl;
try {
destinationAsLocalUrl = new URL("http", "localhost", destination);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
Flowable retVal = mapDestinationToIncomingMessages
.computeIfAbsent(destination, key -> {
logger.info("Listening to requests on " + destination);
Subject<RpcInvocation> stream = PublishSubject.create();
stream = stream.toSerialized();
NonBlockingHttpHandler httpHandler = new NonBlockingHttpHandler(destinationAsLocalUrl, messageTranscriber, targetType, stream);
httpServer.getServerConfiguration().addHttpHandler(httpHandler, destination);
return stream.toFlowable(BackpressureStrategy.BUFFER)
.doFinally(() -> {
mapDestinationToIncomingMessages.remove(destination);
httpServer.getServerConfiguration().removeHttpHandler(httpHandler);
logger.info("Stopped listening to requests on " + destination);
})
.share();
});
return retVal;
}
代码示例来源:origin: com.linkedin.gobblin/gobblin-api
forkedStream = forkedStream.share();
代码示例来源:origin: bsideup/graphql-java-reactive
Flowable<Change> changeFlow = Flowable.fromPublisher((Publisher<Change>) data)
.share();
代码示例来源:origin: commonsguy/cw-androidarch
public RosterViewModel(Application ctxt) {
super(ctxt);
ObservableTransformer<Result, ViewState> toView=
results -> (results.map(result -> {
lastState=foldResultIntoState(lastState, result);
return(lastState);
}));
Controller controller=new Controller(ctxt);
states=LiveDataReactiveStreams
.fromPublisher(controller.resultStream()
.subscribeOn(Schedulers.single())
.compose(toView)
.cache()
.toFlowable(BackpressureStrategy.LATEST)
.share());
controller.subscribeToActions(actionSubject);
process(Action.load());
}
代码示例来源:origin: d4rken/RxShell
.share();
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test2() {
Flowable.just(Ix.range(0, 600))
.flatMapIterable(x->x)
.doOnNext(System.out::println)
.share()
.share()
.count()
.doOnEvent((a, b) -> {
if (a != null) {
System.out.println(a);
} else {
b.printStackTrace();
}
})
.test()
.assertResult(600L);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() {
Flowable.just(Ix.range(0, 300), Ix.range(0, 300))
.flatMapIterable(x->x)
.doOnNext(System.out::println)
.share()
.share()
.count()
.doOnEvent((a, b) -> {
if (a != null) {
System.out.println(a);
} else {
b.printStackTrace();
}
})
.test()
.assertResult(600L);
}
内容来源于网络,如有侵权,请联系作者删除!