本文整理了Java中rx.subjects.Subject.subscribe()
方法的一些代码示例,展示了Subject.subscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Subject.subscribe()
方法的具体详情如下:
包路径:rx.subjects.Subject
类名称:Subject
方法名:subscribe
暂无
代码示例来源:origin: com.netflix.eureka/eureka2-core
@Override
public void call(Subscriber<? super Void> subscriber) {
ackSubject.subscribe(subscriber);
}
};
代码示例来源:origin: com.netflix.eureka/eureka2-core
@Override
public void call(Subscriber<? super T> subscriber) {
notificationSubject.subscribe(subscriber);
}
}, notificationSubject);
代码示例来源:origin: akarnokd/akarnokd-misc
public Subscription subscribe(Observer<TestTransaction> transactionObserver) {
return transactionSubject.subscribe(transactionObserver);
}
代码示例来源:origin: bravekingzhang/CleanArch
/**
* 订阅
* @param subscription new CompositeSubscription()
* @param eventLisener
*/
public void subscribe(@NonNull CompositeSubscription subscription,@NonNull final EventLisener eventLisener){
subscription.add(_bus.subscribe(new Action1<Object>() {
@Override
public void call(Object event) {
eventLisener.dealRxEvent(event);
}
}));
}
代码示例来源:origin: com.netflix.eureka/eureka2-core
@Override
public void call(Subscriber<? super ChangeNotification<T>> subscriber) {
// We need to buffer all the changes while the init state is replayed.
// Because new instance holder updates will be added while we replay them, they will be
// partially visible by the subscriber. When we replay buffered real time updates,
// they may overlap with what was already sent from the init holder.
// TODO can we make this buffering cheaper?
final PauseableSubject<ChangeNotification<T>> realTimeSubject = PauseableSubject.create();
realTimeSubject.pause();
realTimeSource.subscribe(realTimeSubject);
realTimeSubject.mergeWith(Observable.from(initStateHolder).doOnCompleted(new Action0() {
@Override
public void call() {
realTimeSubject.resume();
}
})).subscribe(subscriber);
}
});
代码示例来源:origin: couchbase/java-dcp-client
public HttpStreamingConfigProvider(final ClientEnvironment env) {
super(LifecycleState.DISCONNECTED);
this.env = env;
this.remoteHosts = new AtomicReference<>(env.clusterAt());
this.configStream = BehaviorSubject.<CouchbaseBucketConfig>create().toSerialized();
configStream.subscribe(new Subscriber<CouchbaseBucketConfig>() {
@Override
public void onCompleted() {
LOGGER.debug("Config stream completed.");
}
@Override
public void onError(Throwable e) {
LOGGER.warn("Error on config stream!", e);
}
@Override
public void onNext(CouchbaseBucketConfig config) {
List<InetSocketAddress> newNodes = new ArrayList<>();
for (NodeInfo node : config.nodes()) {
Integer port = (env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.CONFIG);
newNodes.add(new InetSocketAddress(node.rawHostname(), port));
}
LOGGER.trace("Updated config stream node list to {}.", newNodes);
remoteHosts.set(newNodes);
}
});
}
代码示例来源:origin: yahoo/fili
lengthBroadcaster.subscribe(responseLengthObserver);
OutputStream stream = new LengthOfOutputStream(response.getEntityStream(), lengthBroadcaster);
代码示例来源:origin: com.couchbase.client/core-io
@Override
public void call(final Subscriber<? super T> subscriber) {
if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.SUBSCRIBED)) {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
if (null != state.onUnsubscribe) {
state.onUnsubscribe.call();
}
}
}));
state.bufferedSubject.subscribe(subscriber);
state.unsubscribeTimeoutSubscription();
} else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
String thisObservable = "This Observable ";
if (state.traceId != null) {
thisObservable = "This Observable (" + state.traceId + ") ";
}
subscriber.onError(new IllegalStateException(thisObservable + "can only have one subscription. "
+ "Use Observable.publish() if you want to multicast."));
} else if(State.STATES.DISPOSED.ordinal() == state.state) {
String thisObservable = "The content of this Observable ";
if (state.traceId != null) {
thisObservable = "The content of this Observable (" + state.traceId + ") ";
}
subscriber.onError(new IllegalStateException(thisObservable + "is already released. "
+ "Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting."));
}
}
}
代码示例来源:origin: couchbase/couchbase-jvm-core
@Override
public void call(final Subscriber<? super T> subscriber) {
if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.SUBSCRIBED)) {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
if (null != state.onUnsubscribe) {
state.onUnsubscribe.call();
}
}
}));
state.bufferedSubject.subscribe(subscriber);
state.unsubscribeTimeoutSubscription();
} else if(State.STATES.SUBSCRIBED.ordinal() == state.state) {
String thisObservable = "This Observable ";
if (state.traceId != null) {
thisObservable = "This Observable (" + state.traceId + ") ";
}
subscriber.onError(new IllegalStateException(thisObservable + "can only have one subscription. "
+ "Use Observable.publish() if you want to multicast."));
} else if(State.STATES.DISPOSED.ordinal() == state.state) {
String thisObservable = "The content of this Observable ";
if (state.traceId != null) {
thisObservable = "The content of this Observable (" + state.traceId + ") ";
}
subscriber.onError(new IllegalStateException(thisObservable + "is already released. "
+ "Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting."));
}
}
}
内容来源于网络,如有侵权,请联系作者删除!