本文整理了Java中rx.Observable.replay()
方法的一些代码示例,展示了Observable.replay()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.replay()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:replay
[英]Returns a ConnectableObservable that shares a single subscription to the underlying Observable that will replay all of its items and notifications to any future Observer. A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect method is called.
Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: This version of replay does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它共享对基础Observable的单个订阅,该订阅将向任何未来的观察者重播其所有项目和通知。可连接的可观察对象类似于普通的可观察对象,只是它在订阅时不会开始发射项目,而只有在调用其连接方法时才会发射项目。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。调度程序:默认情况下,此版本的replay不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxNetty
static <X> DisposableContentSource<X> createNew(Observable<X> source) {
final ArrayList<X> chunks = new ArrayList<>();
ConnectableObservable<X> replay = source.doOnNext(new Action1<X>() {
@Override
public void call(X x) {
chunks.add(x);
}
}).replay();
return new DisposableContentSource<>(new OnSubscribeImpl<X>(replay, chunks));
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
/**
* This method will only be called once when the retained Fragment is first created.
*/
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// Retain this fragment across configuration changes.
setRetainInstance(true);
if (_storedIntsObservable != null) {
return;
}
Observable<Integer> intsObservable =//
Observable.interval(1, TimeUnit.SECONDS)//
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long aLong) {
return aLong.intValue();
}
})//
.take(20);
// -----------------------------------------------------------------------------------
// Making our observable "HOT" for the purpose of the demo.
//_intsObservable = _intsObservable.share();
_storedIntsObservable = intsObservable.replay();
_storedIntsSubscription = _storedIntsObservable.connect();
// Do not do this in production!
// `.share` is "warm" not "hot"
// the below forceful subscription fakes the heat
//_intsObservable.subscribe();
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
return replay(bufferSize, time, unit, Schedulers.computation());
代码示例来源:origin: com.netflix.rxjava/rxjava-core
/**
* Returns a {@link ConnectableObservable} that shares a single subscription to the source Observable and
* replays all items emitted by that Observable within a specified time window. A Connectable Observable
* resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to,
* but only when its {@code connect} method is called.
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.t.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time
* the duration of the window in which the replayed items must have been emitted
* @param unit
* the time unit of {@code time}
* @return a {@link ConnectableObservable} that shares a single subscription to the source Observable and
* replays the items that were emitted during the window defined by {@code time}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229232.aspx">MSDN: Observable.Replay</a>
*/
public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
return replay(time, unit, Schedulers.computation());
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
return replay(selector, bufferSize, time, unit, Schedulers.computation());
代码示例来源:origin: com.netflix.rxjava/rxjava-core
return replay(selector, time, unit, Schedulers.computation());
代码示例来源:origin: techery/janet
private <A> Observable<A> createPipeline(Observable<A> source) {
return source.mergeWith(clearingStream).replay(1).autoConnect();
}
代码示例来源:origin: com.netflix.eureka/eureka2-write-server
@Override
public Observable<Void> connect(final Observable<InstanceInfo> registrant) {
final AtomicReference<InstanceInfo> instanceInfoRef = new AtomicReference<>();
return registrant.replay(1).refCount()
.flatMap(new Func1<InstanceInfo, Observable<Void>>() {
@Override
public Observable<Void> call(InstanceInfo instanceInfo) {
logger.info("registering self InstanceInfo {}", instanceInfo);
instanceInfoRef.set(instanceInfo);
return registry.register(instanceInfo, selfSource).ignoreElements().cast(Void.class);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
InstanceInfo info = instanceInfoRef.getAndSet(null);
if (info != null) {
logger.info("unregistering self InstanceInfo {}", info);
registry.unregister(info, selfSource).subscribe();
}
}
});
}
代码示例来源:origin: yahoo/fili
.filter(Either::isRight)
.map(Either::getRight)
.replay(1);
jobMetadata
.replay(1);
jobRowStoredNotification.connect();
jobMetadata
.replay(1);
preResponseStoredNotification.connect();
jobMetadata
.replay(1);
jobRowUpdatedNotification.connect();
代码示例来源:origin: yahoo/fili
.filter(ticket::equals)
.take(1)
.replay(1);
broadcastChannelNotifications.connect();
代码示例来源:origin: com.netflix.eureka/eureka2-core
final Observable<OP> opObservable = opStream.replay(1).refCount();
final Subscriber<OP> opSubscriber = new NoOpSubscriber<>();
final Observable<CHANNEL> channelObservable = channelObservableWithCleanUp(channelSubject);
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test1x() throws Exception {
rx.schedulers.TestScheduler scheduler = new rx.schedulers.TestScheduler();
rx.Observable<Integer> source = rx.Observable.just(1)
.replay(2, TimeUnit.SECONDS, scheduler)
.autoConnect();
source.test().assertResult(1);
source.test().assertResult(1);
scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
source.test().assertResult();
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() {
Observable<Integer> obs1 = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onCompleted();
}, Emitter.BackpressureMode.BUFFER);
Observable<Integer> obs2 = Observable.create(emitter -> {
emitter.onNext(2);
emitter.onCompleted();
}, Emitter.BackpressureMode.BUFFER);
Observable<Integer> both = Observable
.concatDelayError(obs1, obs2)
.replay().autoConnect();
both.test().assertResult(1, 2);
both.test().assertResult(1, 2);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void testFlakyReplayBehavior2() {
final PublishSubject<Integer> subject = PublishSubject.create();
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
sums.connect();
subject.onNext(2);
subject.onNext(3);
sums.subscribe(subscriber);
// subscriber.assertValueCount(1);
subscriber.assertValues(6);
}
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void testFlakyReplayBehavior() {
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final ConnectableObservable<Integer> sums = subject.scan(1, (a, b) -> a + b).replay(1);
sums.connect();
subject.onNext(2);
subject.onNext(3);
scheduler.triggerActions();
sums.subscribe(subscriber);
// subscriber.assertValueCount(1);
subscriber.assertValues(6);
}
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void testExpectedReplayBehavior() {
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final ConnectableObservable<Integer> sums = subject.scan((a, b) -> a + b).replay(1);
sums.connect();
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
scheduler.triggerActions();
sums.subscribe(subscriber);
subscriber.assertValueCount(1);
subscriber.assertValues(6);
}
内容来源于网络,如有侵权,请联系作者删除!