本文整理了Java中io.reactivex.subjects.Subject.subscribe()
方法的一些代码示例,展示了Subject.subscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.subscribe()
方法的具体详情如下:
包路径:io.reactivex.subjects.Subject
类名称:Subject
方法名:subscribe
暂无
代码示例来源:origin: ReactiveX/RxJava
@Override
protected void subscribeActual(Observer<? super T> observer) {
actual.subscribe(observer);
}
代码示例来源:origin: redisson/redisson
@Override
protected void subscribeActual(Observer<? super T> observer) {
actual.subscribe(observer);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorThrownIssue1685() {
Subject<Object> subject = ReplaySubject.create();
Observable.error(new RuntimeException("oops"))
.materialize()
.delay(1, TimeUnit.SECONDS)
.dematerialize(Functions.<Notification<Object>>identity())
.subscribe(subject);
subject.subscribe();
subject.materialize().blockingFirst();
System.out.println("Done");
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Override
public void call(rx.Subscriber<? super T> t) {
SourceObserver<T> parent = new SourceObserver<T>(t);
t.add(parent);
t.setProducer(parent);
subject.subscribe(parent);
}
代码示例来源:origin: akarnokd/RxJava2Extensions
@Override
protected void subscribeActual(Observer<? super T> observer) {
subject.subscribe(observer);
}
代码示例来源:origin: com.github.akarnokd/rxjava2-extensions
@Override
protected void subscribeActual(Observer<? super T> observer) {
subject.subscribe(observer);
}
代码示例来源:origin: k-kagurazaka/rx-property-android
@Override
protected void subscribeActual(Observer<? super T> observer) {
valueEmitter.subscribe(observer);
}
代码示例来源:origin: k-kagurazaka/rx-property-android
@Override
protected void subscribeActual(final Observer<? super T> observer) {
kicker.subscribe(observer);
}
代码示例来源:origin: k-kagurazaka/rx-property-android
@Override
protected void subscribeActual(Observer<? super T> observer) {
valueEmitter.subscribe(observer);
}
代码示例来源:origin: info.magnolia.ui/magnolia-ui-framework
@Override
public Disposable observe(Consumer<Optional<T>> action) {
return subject.subscribe(action, e -> log.error("Failed to dispatch context property change: {}", e.getMessage(), e));
}
代码示例来源:origin: akarnokd/RxJava2Extensions
@Override
protected void subscribeActual(Observer<? super T> observer) {
RefCountObserver<T> rcs = new RefCountObserver<T>(observer, this);
if (!add(rcs)) {
EmptyDisposable.error(new IllegalStateException("RefCountSubject terminated"), observer);
return;
}
actual.subscribe(rcs);
}
代码示例来源:origin: com.github.akarnokd/rxjava2-extensions
@Override
protected void subscribeActual(Observer<? super T> observer) {
RefCountObserver<T> rcs = new RefCountObserver<T>(observer, this);
if (!add(rcs)) {
EmptyDisposable.error(new IllegalStateException("RefCountSubject terminated"), observer);
return;
}
actual.subscribe(rcs);
}
代码示例来源:origin: com.github.akarnokd/rxjava2-interop
@Override
public void call(rx.Subscriber<? super T> t) {
SourceObserver<T> parent = new SourceObserver<T>(t);
t.add(parent);
t.setProducer(parent);
subject.subscribe(parent);
}
代码示例来源:origin: k-kagurazaka/rx-property-android
validationTriggerDisposable = validationTrigger.subscribe(new Consumer<T>() {
@Override
public void accept(T value) {
代码示例来源:origin: akarnokd/akarnokd-misc
@Test
public void test() {
Subject<String> subject = PublishSubject.create();
Observable<String> observable = subject.scan("zero", (a, b) -> a + ", " + b);
Subject<String> multicast = BehaviorSubject.create();
observable.subscribe(multicast);
Disposable first = multicast.subscribe(System.out::println); // "zero"
subject.onNext("one"); // "zero, one"
first.dispose();
Disposable second = multicast.subscribe(System.out::println); // "zero, one"
subject.onNext("two"); // "zero, one, two"
second.dispose();
}
}
代码示例来源:origin: adamkewley/jobson
private void advanceJobQueue() {
final QueuedJob queuedJob = jobQueue.poll();
if (queuedJob == null) return;
final Subject<byte[]> stdout = PublishSubject.create();
final Subject<byte[]> stderr = PublishSubject.create();
jobDAO.appendStdout(queuedJob.getId(), stdout);
jobDAO.appendStderr(queuedJob.getId(), stderr);
stdout.subscribe(queuedJob.getQueuedListeners().getOnStdoutListener());
stderr.subscribe(queuedJob.getQueuedListeners().getOnStderrListener());
try {
final CancelablePromise<JobExecutionResult> executionPromise =
jobExecutor.execute(queuedJob, JobEventListeners.create(stdout, stderr));
final ExecutingJob executingJob =
ExecutingJob.fromQueuedJob(queuedJob, now(), stdout, stderr);
executingJobs.put(executingJob.getId(), executingJob);
updateJobStatus(queuedJob.getId(), RUNNING, "Submitted to executor");
executionPromise.thenAccept(res -> {
onExecutionFinished(executingJob, res);
});
executingJob.getCompletionPromise().onCancel(() -> {
executionPromise.cancel(true);
});
} catch (Throwable ex) {
log.error("Error starting job execution: " + ex.toString());
updateJobStatus(queuedJob.getId(), FATAL_ERROR, "Error executing job: " + ex.toString());
}
}
代码示例来源:origin: akarnokd/RxJava2Extensions
rcp.subscribe(new Observer<Integer>() {
代码示例来源:origin: akarnokd/RxJava2Extensions
rcp.subscribe(new Observer<Integer>() {
内容来源于网络,如有侵权,请联系作者删除!