本文整理了Java中rx.Observable.publish()
方法的一些代码示例,展示了Observable.publish()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.publish()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:publish
[英]Returns a ConnectableObservable, which waits until its ConnectableObservable#connect method is called before it begins emitting items to those Observers that have subscribed to it.
Backpressure Support: This operator does not support backpressure because multicasting means the stream is "hot" with multiple subscribers. Each child will need to manage backpressure independently using operators such as #onBackpressureDrop and #onBackpressureBuffer. Scheduler: publish does not operate by default on a particular Scheduler.
[中]返回一个ConnectableObservable,它将等待调用其ConnectableObservable#connect方法,然后开始向订阅它的观察者发送项目。
背压支持:该操作员不支持背压,因为多播意味着流对于多个订户来说是“热的”。每个孩子都需要使用#onBackpressureDrop和#onBackpressureBuffer等操作符独立管理背压。计划程序:默认情况下,发布不会在特定计划程序上运行。
代码示例来源:origin: apache/usergrid
@Test
@Category(ExperimentalTest.class )
public void testConnectableObserver() throws InterruptedException {
final int count = 10;
final CountDownLatch latch = new CountDownLatch( count );
final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
//connect to our latch, which should run on it's own subscription
//start our latch running
connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
//start the sequence
connectedObservable.connect();
final boolean completed = latch.await( 5, TimeUnit.SECONDS );
assertTrue( "publish1 behaves as expected", completed );
final int returnedCount = countObservable.toBlocking().last();
assertEquals( "Counts the same", count, returnedCount );
}
代码示例来源:origin: cn-ljb/rxjava_for_android
@Override
public void onStart() {
super.onStart();
//将普通的Observable转换为可连接的Observable
ConnectableObservable<Object> publish = _rxBus.toObserverable().publish();
publish.compose(this.bindToLifecycle())
.subscribe(new Action1<Object>() { //一个一旦被触发就会显示TapText的监听者
@Override
public void call(Object event) {
if (event instanceof RxBusDemoFragment.TapEvent) {
_showTapText();
}
}
});
publish.compose(this.bindUntilEvent(FragmentEvent.DESTROY))
.publish(new Func1<Observable<Object>, Observable<List<Object>>>() {//一个出发后缓存一秒内的点击数并显示的监听者
@Override
public Observable<List<Object>> call(Observable<Object> stream) {
return stream.buffer(stream.debounce(1, TimeUnit.SECONDS)); //进行缓冲1秒,打包发送
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
@Override
public void call(List<Object> taps) {
_showTapCount(taps.size());
}
});
publish.connect(); //可连接的Observable并不在订阅时触发,而需手动调用connect()方法
}
代码示例来源:origin: jhusain/learnrxjava
public static void main(String args[]) {
// debounce to the last value in each burst
//intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
/* The following will emit a buffered list as it is debounced */
// first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe
Observable<Integer> burstStream = intermittentBursts().take(20).publish().refCount();
// then we get the debounced version
Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS);
// then the buffered one that uses the debounced stream to demark window start/stop
Observable<List<Integer>> buffered = burstStream.buffer(debounced);
// then we subscribe to the buffered stream so it does what we want
buffered.toBlocking().forEach(System.out::println);
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
@Override
public void onStart() {
super.onStart();
_subscriptions = new CompositeSubscription();
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();
_subscriptions//
.add(tapEventEmitter.subscribe(new Action1<Object>() {
@Override
public void call(Object event) {
if (event instanceof RxBusDemoFragment.TapEvent) {
_showTapText();
}
}
}));
_subscriptions//
.add(tapEventEmitter.publish(new Func1<Observable<Object>, Observable<List<Object>>>() {
@Override
public Observable<List<Object>> call(Observable<Object> stream) {
return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<Object>>() {
@Override
public void call(List<Object> taps) {
_showTapCount(taps.size());
}
}));
_subscriptions.add(tapEventEmitter.connect());
}
代码示例来源:origin: spotify/mobius
@Override
public Observable<R> call(Observable<T> input) {
return input.publish(
new Func1<Observable<T>, Observable<R>>() {
@Override
public Observable<R> call(Observable<T> innerInput) {
final List<Observable<R>> transformed = new ArrayList<>();
for (Observable.Transformer<T, R> transformer : transformers) {
transformed.add(innerInput.compose(transformer));
}
return Observable.merge(transformed);
}
});
}
}
代码示例来源:origin: com.github.cloudfoundry-community/snotel
@Override
public Observable<Envelope> open() {
if (closed) {
throw new IllegalStateException("The firehose client is closed.");
}
return Observable
.create(onSubscribe)
.publish()
.refCount();
}
};
代码示例来源:origin: stephanenicolas/toothpick
@Inject
public RxPresenter() {
timeObservable = Observable.interval(1, TimeUnit.SECONDS) //
.subscribeOn(Schedulers.newThread()) //
.observeOn(AndroidSchedulers.mainThread()) //
.publish();
connect = timeObservable.connect();
}
代码示例来源:origin: ordina-jworks/microservices-dashboard-server
protected Observable<String> getServiceIdsFromDiscoveryClient() {
logger.info("Discovering services for health");
return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
.map(id -> id.toLowerCase())
.filter(id -> !id.equals(ZUUL))
.doOnNext(s -> logger.debug("Service discovered: " + s))
.doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
.retry();
}
代码示例来源:origin: ordina-jworks/microservices-dashboard-server
private Observable<String> getServicesFromDiscoveryClient() {
logger.info("Discovering services");
return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
.map(String::toLowerCase)
.doOnNext(s -> logger.debug("Service discovered: " + s))
.doOnError(e -> {
String error = "Error retrieving services: " + e.getMessage();
logger.error(error);
publisher.publishEvent(new SystemEvent(error, e));
})
.retry();
}
代码示例来源:origin: ordina-jworks/microservices-dashboard-server
protected Observable<String> getServiceIdsFromDiscoveryClient() {
logger.info("Discovering services for mappings");
return Observable.from(discoveryClient.getServices()).subscribeOn(Schedulers.io()).publish().autoConnect()
.map(id -> id.toLowerCase())
.filter(id -> !id.equals(ZUUL))
.doOnNext(s -> logger.debug("Service discovered: " + s))
.doOnError(e -> errorHandler.handleSystemError("Error filtering services: " + e.getMessage(), e))
.retry();
}
代码示例来源:origin: com.github.kmbulebu.nicknack/nicknack-core
private ProviderServiceImpl(final Path providersDirectory, final XMLConfiguration configuration) {
this.configuration = configuration;
loader = new ProviderLoader(providersDirectory);
eventStream = Observable.create(this).publish();
eventStream.connect();
// TODO Do something smart with the errors.
initializeProviders();
}
代码示例来源:origin: akarnokd/RxAgera
public RxObservableAsAgera(rx.Observable<?> source) {
if (source instanceof Subject) {
// no need to publish/autoConnect a subject
this.source = source.doAfterTerminate(this);
} else {
this.source = source.doAfterTerminate(this).publish().autoConnect();
}
this.updatables = new HashMap<>();
}
代码示例来源:origin: dswarm/dswarm
protected ConnectableObservable<GDMModel> transformResultModel(final Observable<org.dswarm.persistence.model.internal.Model> model) {
final AtomicInteger resultCounter = new AtomicInteger(0);
return model
.onBackpressureBuffer(10000)
.doOnSubscribe(() -> GDMModelTransformationFlow.LOG.debug("subscribed to results observable in transformation engine"))
.doOnNext(resultObj -> {
resultCounter.incrementAndGet();
if (resultCounter.get() == 1) {
GDMModelTransformationFlow.LOG.debug("received first result in transformation engine");
}
})
.doOnCompleted(() -> GDMModelTransformationFlow.LOG.debug("received '{}' results in transformation engine overall", resultCounter.get()))
.cast(org.dswarm.persistence.model.internal.gdm.GDMModel.class)
.onBackpressureBuffer(10000)
.publish();
}
代码示例来源:origin: dswarm/dswarm
protected ConnectableObservable<org.dswarm.persistence.model.internal.Model> doPostProcessingOfResultModel(final GDMModelReceiver writer, final Scheduler scheduler) {
final ConnectableObservable<GDMModel> modelConnectableObservable = writer.getObservable()
.observeOn(scheduler)
.onBackpressureBuffer(10000)
.publish();
final ConnectableObservable<org.dswarm.persistence.model.internal.Model> model = doPostProcessingOfResultModel(modelConnectableObservable);
modelConnectableObservable.connect();
return model;
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-bus-turbine
@Bean
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator.aggregateGroupedStreams(hystrixSubject()
.groupBy(data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("BusTurbine => Starting aggregation"))
.flatMap(o -> o).publish().refCount();
int port = new Integer(env.getProperty("server.port", "8989"));
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(port, (request, response) -> {
log.info("BusTurbine => SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return publishedStreams
.doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data))));
}, sseServerConfigurator());
return httpServer;
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_186() throws Exception {
Observable<Status> observable = status();
Observable<Status> lazy = observable.publish().refCount();
//...
System.out.println("Before subscribers");
Subscription sub1 = lazy.subscribe();
System.out.println("Subscribed 1");
Subscription sub2 = lazy.subscribe();
System.out.println("Subscribed 2");
sub1.unsubscribe();
System.out.println("Unsubscribed 1");
sub2.unsubscribe();
System.out.println("Unsubscribed 2");
}
代码示例来源:origin: akarnokd/akarnokd-misc
public static void main(String[] args) {
Observable.range(1, 10)
.doOnSubscribe(() -> System.out.println("Subscribed"))
.publish(o -> Observable.zip(o.map(v -> v * 10), o.map(v -> v * 20), (a, b) -> a + "-" + b))
.subscribe(System.out::println, Throwable::printStackTrace);
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_206() throws Exception {
final Observable<Status> tweets = status();
ConnectableObservable<Status> published = tweets.publish();
published.connect();
}
代码示例来源:origin: akarnokd/akarnokd-misc
static <T> Observable.Transformer<T, T> debounceFirst(long timeout, TimeUnit unit) {
return f ->
f.publish(g ->
g.take(1)
.concatWith(
g.switchMap(u -> Observable.timer(timeout, unit).map(w -> u))
.take(1)
.ignoreElements()
)
.repeatWhen(h -> h.takeUntil(g.ignoreElements()))
)
;
}
}
代码示例来源:origin: nurkiewicz/rxjava-book-examples
@Test
public void sample_48() throws Exception {
ConnectableObservable<Long> upstream = Observable
.interval(99, MILLISECONDS)
.publish();
upstream
.debounce(100, MILLISECONDS)
.timeout(1, SECONDS, upstream.take(1));
upstream.connect();
}
内容来源于网络,如有侵权,请联系作者删除!