本文整理了Java中io.reactivex.subjects.Subject.share()
方法的一些代码示例,展示了Subject.share()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.share()
方法的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject
方法名:share
暂无
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) {
Subject<Integer> rp = PublishSubject.<Integer>create().toSerialized();
Observable<Integer> share = rp.share();
int numPrevious = 2;
share.buffer(numPrevious, 1)
.filter(b -> b.size() < numPrevious || b.get(numPrevious - 1) >= 3)
.map(v -> v.get(0))
.subscribe(System.out::println);
Ix.range(1, 10).foreach(rp::onNext);
rp.onComplete();
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test4() throws Exception {
TestScheduler sch = new TestScheduler();
Subject<Long> subject = PublishSubject.create();
Observable<Long> initialObservable = subject.share()
.map(value -> {
System.out.println("Received value " + value);
//new Exception().printStackTrace(System.out);
return value;
});
Observable<Long> timeoutObservable = initialObservable.map(value -> {
System.out.println("Timeout received value " + value);
return value;
});
TestObserver<Long> subscriber = new TestObserver<>();
initialObservable
.doOnDispose(() -> {
System.out.println("Unsubscribed");
new Exception().printStackTrace(System.out);
})
.timeout(1, TimeUnit.SECONDS, sch, timeoutObservable).subscribe(subscriber);
subject.onNext(5L);
sch.advanceTimeBy(2, TimeUnit.SECONDS);
subject.onNext(10L);
subject.onComplete();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValues(5L, 10L);
}
内容来源于网络,如有侵权,请联系作者删除!