本文整理了Java中rx.Observable.unsafeSubscribe()
方法的一些代码示例,展示了Observable.unsafeSubscribe()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.unsafeSubscribe()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:unsafeSubscribe
[英]Subscribes to an Observable and invokes OnSubscribe function without any contract protection, error handling, unsubscribe, or execution hooks.
Use this only for implementing an Operator that requires nested subscriptions. For other purposes, use #subscribe(Subscriber) which ensures the Rx contract and other functionality. Scheduler: unsafeSubscribe does not operate by default on a particular Scheduler.
[中]订阅Observable并调用OnSubscribe函数,而无需任何契约保护、错误处理、取消订阅或执行挂钩。
仅用于实现需要嵌套订阅的运算符。出于其他目的,请使用#订阅(订户),以确保接收合同和其他功能。调度程序:默认情况下,unsafeSubscribe不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super T> subscriber) {
source.unsafeSubscribe(subscriber);
}
});
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super PooledConnection<R, W>> subscriber) {
final IdleConnectionsHolder<W, R> holderForThisEL = perElHolder.get();
if (null == holderForThisEL) {
/*Caller is not an eventloop*/
PreferCurrentEventLoopHolder.super.pollThisEventLoopConnections().unsafeSubscribe(subscriber);
} else {
holderForThisEL.poll().unsafeSubscribe(subscriber);
}
}
});
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super T> subscriber) {
if (disposed.get()) {
subscriber.onError(new IllegalStateException("Content source is already disposed."));
}
boolean connectNow = false;
synchronized (this) {
if (!subscribed) {
connectNow = true;
subscribed = true;
}
}
source.doOnNext(new Action1<T>() {
@Override
public void call(T msg) {
ReferenceCountUtil.retain(msg);
}
}).unsafeSubscribe(subscriber);
if (connectNow) {
source.connect();
}
}
}
代码示例来源:origin: PipelineAI/pipeline
/* package */ HystrixThreadEventStream(Thread thread) {
this.threadId = thread.getId();
this.threadName = thread.getName();
writeOnlyCommandStartSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCollapserSubject = PublishSubject.create();
writeOnlyCommandStartSubject
.onBackpressureBuffer()
.doOnNext(writeCommandStartsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()
.doOnNext(writeCommandCompletionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeOnlyCollapserSubject
.onBackpressureBuffer()
.doOnNext(writeCollapserExecutionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
}
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super Void> subscriber) {
if (!isUsable()) {
PooledConnection.this.owner.discard(PooledConnection.this)
.unsafeSubscribe(subscriber);
} else {
Long keepAliveTimeout = unsafeNettyChannel().attr(DYNAMIC_CONN_KEEP_ALIVE_TIMEOUT_MS).get();
if (null != keepAliveTimeout) {
PooledConnection.this.maxIdleTimeMillis = keepAliveTimeout;
}
markAwarePipeline.reset(); // Reset pipeline state, if changed, on release.
PooledConnection.this.owner.release(PooledConnection.this)
.doOnCompleted(new Action0() {
@Override
public void call() {
releasedAtLeastOnce = true;
lastReturnToPoolTimeMillis = System.currentTimeMillis();
}
})
.unsafeSubscribe(subscriber);
}
}
}).onErrorResumeNext(discard());
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super PooledConnection<R, W>> subscriber) {
final long startTimeNanos = Clock.newStartTimeNanos();
if (limitDeterminationStrategy.acquireCreationPermit(startTimeNanos, NANOSECONDS)) {
Observable<Connection<R, W>> newConnObsv = hostConnector.getConnectionProvider()
.newConnectionRequest();
newConnObsv.map(new Func1<Connection<R, W>, PooledConnection<R, W>>() {
@Override
public PooledConnection<R, W> call(Connection<R, W> connection) {
return PooledConnection.create(PooledConnectionProviderImpl.this,
maxIdleTimeMillis, connection);
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
limitDeterminationStrategy.releasePermit(); /*Before connect we acquired.*/
}
}).unsafeSubscribe(subscriber);
} else {
idleConnectionsHolder.poll()
.switchIfEmpty(Observable.<PooledConnection<R, W>>error(
new PoolExhaustedException("Client connection pool exhausted.")))
.unsafeSubscribe(subscriber);
}
}
});
代码示例来源:origin: ReactiveX/RxNetty
@Override
public void call(Subscriber<? super Connection<R, W>> subscriber) {
if (isShutdown) {
subscriber.onError(new IllegalStateException("Connection provider is shutdown."));
}
idleConnectionsHolder.pollThisEventLoopConnections()
.concatWith(connectIfAllowed())
.filter(new Func1<PooledConnection<R, W>, Boolean>() {
@Override
public Boolean call(PooledConnection<R, W> c) {
boolean isUsable = c.isUsable();
if (!isUsable) {
discardNow(c);
}
return isUsable;
}
})
.take(1)
.lift(new ReuseSubscriberLinker())
.lift(new ConnectMetricsOperator())
.unsafeSubscribe(subscriber);
}
});
代码示例来源:origin: akarnokd/RxJava2Interop
@Override
protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
ObservableSubscriber<T> parent = new ObservableSubscriber<T>(s);
ObservableSubscriberSubscription parentSubscription = new ObservableSubscriberSubscription(parent);
s.onSubscribe(parentSubscription);
source.unsafeSubscribe(parent);
}
代码示例来源:origin: akarnokd/RxJava2Interop
@Override
protected void subscribeActual(io.reactivex.Observer<? super T> s) {
ObservableSubscriber<T> parent = new ObservableSubscriber<T>(s);
s.onSubscribe(parent);
source.unsafeSubscribe(parent);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void call(Subscriber<? super T> t1) {
target.unsafeSubscribe(t1);
}
};
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void onCompleted() {
if (!subscribed) {
subscribed = true;
delayed.unsafeSubscribe(s);
}
}
};
代码示例来源:origin: davidmoten/rxjava-jdbc
.execute(query.returnGeneratedKeysFunction());
Subscriber<T> sub = createSubscriber(subscriber);
o.unsafeSubscribe(sub);
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void call() {
if (!s.isUnsubscribed()) {
source.unsafeSubscribe(s);
}
}
}, time, unit);
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public void call(final Subscriber<? super T> child) {
Subscriber<T> parent = createSubscriber(child, onEmpty);
observable.unsafeSubscribe(parent);
}
代码示例来源:origin: com.github.akarnokd/rxjava2-interop
@Override
protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
ObservableSubscriber<T> parent = new ObservableSubscriber<T>(s);
ObservableSubscriberSubscription parentSubscription = new ObservableSubscriberSubscription(parent);
s.onSubscribe(parentSubscription);
source.unsafeSubscribe(parent);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void call(Subscriber<? super T> s) {
Observable<? extends T> o;
try {
o = observableFactory.call();
} catch (Throwable t) {
s.onError(t);
return;
}
o.unsafeSubscribe(s);
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public void call(Subscriber<? super T> child) {
final MapLastSubscriber<T> parent = new MapLastSubscriber<T>(child, function);
child.add(parent);
child.setProducer(new Producer() {
@Override
public void request(long n) {
parent.requestMore(n);
}
});
source.unsafeSubscribe(parent);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
final ParentSubscriber<T> parent = new ParentSubscriber<T>();
Observable<T> middle = Observable.create(new ForwarderOnSubscribe<T>(parent));
subscriber.add(parent);
operation.call(middle).unsafeSubscribe(subscriber);
return parent;
}
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public void call(Subscriber<? super T> t) {
OnTerminateResumeSubscriber<T> parent = new OnTerminateResumeSubscriber<T>(t, onError, onCompleted);
t.add(parent);
t.setProducer(parent.arbiter);
o.unsafeSubscribe(parent);
}
});
内容来源于网络,如有侵权,请联系作者删除!