本文整理了Java中rx.Scheduler
类的一些代码示例,展示了Scheduler
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler
类的具体详情如下:
包路径:rx.Scheduler
类名称:Scheduler
[英]Represents an object that schedules units of work.
Common implementations can be found in Schedulers.
Why is this an abstract class instead of an interface?
代码示例来源:origin: BaronZ88/MinimalistWeather
try {
Weather weather = weatherDao.queryWeather(cityId);
subscriber.onNext(weather);
subscriber.onCompleted();
} catch (SQLException e) {
throw Exceptions.propagate(e);
observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork.doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> {
try {
weatherDao.insertOrUpdateWeather(weather);
代码示例来源:origin: akarnokd/RxJava2Interop
@Override
public long now(TimeUnit unit) {
return unit.convert(source.now(), TimeUnit.MILLISECONDS);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void call(final Subscriber<? super T> s) {
final Worker worker = scheduler.createWorker();
s.add(worker);
worker.schedule(new Action0() {
@Override
public void call() {
if (!s.isUnsubscribed()) {
source.unsafeSubscribe(s);
}
}
}, time, unit);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void onNext(T v) {
long now = scheduler.now();
if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) {
lastOnNext = now;
subscriber.onNext(v);
}
}
代码示例来源:origin: PipelineAI/pipeline
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
代码示例来源:origin: mp911de/spinach
if (subscriber.isUnsubscribed()) {
return;
subscriber.onStart();
Scheduler.Worker worker = scheduler.createWorker();
Subscription subscription = worker.schedulePeriodically(getJobsAction, 0, 10, TimeUnit.MILLISECONDS);
getJobsAction.setSelfSubscription(subscription);
reconnectTrigger = worker.schedulePeriodically(new Action0() {
@Override
public void call() {
log.debug("QueueListener.call caught an exception: {}", e.getMessage(), e);
subscriber.onError(e);
代码示例来源:origin: colintheshots/RxJavaExamples
public static void main(String[] args) {
Observable.create((Subscriber<? super Calendar> subscriber) -> Schedulers.immediate().createWorker()
.schedulePeriodically(() ->
subscriber.onNext(Calendar.getInstance(Locale.US)), INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS))
.take(10)
.observeOn(Schedulers.io())
.subscribe(calendar -> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMAT);
String time = simpleDateFormat.format(calendar.getTime());
System.out.println(time);
}, Throwable::printStackTrace);
}
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
child.add(scheduledUnsubscribe);
child.setProducer(new Producer() {
@Override
public void request(long n) {
REQUESTED.getAndAdd(ObserveOnSubscriber.this, n);
schedule();
}
});
add(scheduledUnsubscribe);
child.add(recursiveScheduler);
child.add(this);
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void onCompleted() {
emitItemsOutOfWindow(scheduler.now());
subscriber.onCompleted();
}
代码示例来源:origin: hzsweers/RxNormalize
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Scheduler.Worker worker = scheduler.createWorker();
child.add(worker);
return new Subscriber<T>(child) {
private volatile long lastTimestamp = scheduler.now();
private volatile long nextTimestamp = lastTimestamp;
private AtomicBoolean isDraining = new AtomicBoolean();
代码示例来源:origin: com.netflix.eureka/eureka2-core
public BaseMessageConnection(
String name,
ObservableConnection<Object, Object> connection,
MessageConnectionMetrics metrics,
Scheduler expiryScheduler) {
this.connection = connection;
this.metrics = metrics;
this.name = descriptiveName(name);
schedulerWorker = expiryScheduler.createWorker();
installAcknowledgementHandler();
this.startTime = expiryScheduler.now();
metrics.incrementConnectionCounter();
}
代码示例来源:origin: com.github.mrstampy/esp
@Override
public void triggerProcessing(final int numSamples) {
connectionCheck();
Schedulers.newThread().schedule(new Action1<Scheduler.Inner>() {
@Override
public void call(Inner t1) {
process(getConnection().getCurrentFor(numSamples, getChannel()));
}
});
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public void call(final Subscriber<? super Long> child) {
Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedule(new Action0() {
@Override
public void call() {
try {
child.onNext(0L);
} catch (Throwable t) {
child.onError(t);
return;
}
child.onCompleted();
}
}, time, unit);
}
代码示例来源:origin: com.github.davidmoten/rxjava-extras
@Override
public void onNext(T t) {
long now = scheduler.now();
if (nextWindowStartTime == UNSET) {
nextWindowStartTime = now + windowDurationMs;
subscriber.onNext(t);
} else if (now >= nextWindowStartTime) {
// ensure that we advance the next window start time to just
// beyond now
long n = (now - nextWindowStartTime) / windowDurationMs + 1;
nextWindowStartTime += n * windowDurationMs;
subscriber.onNext(t);
}
}
代码示例来源:origin: ReactiveX/RxNetty
writeWorker = Schedulers.computation().createWorker();
代码示例来源:origin: com.netflix.rxjava/rxjava-core
final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
final Scheduler.Worker worker = scheduler.createWorker();
child.add(worker);
child.add(sourceSubscriptions);
worker.schedule(new Action0() {
@Override
public void call() {
child.setProducer(new Producer() {
代码示例来源:origin: com.netflix.eureka/eureka2-test-utils
public void start() {
final long startTime = scheduler.now();
worker = scheduler.createWorker();
scheduler.createWorker().schedule(new Action0() {
@Override
public void call() {
代码示例来源:origin: com.github.mrstampy/esp
@Override
public void triggerProcessing() {
connectionCheck();
Schedulers.newThread().schedule(new Action1<Scheduler.Inner>() {
@Override
public void call(Inner t1) {
process(getConnection().getCurrentFor(getChannel()));
}
});
}
代码示例来源:origin: com.netflix.rxjava/rxjava-core
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {
代码示例来源:origin: davidmoten/rxjava-extras
@Override
public void onNext(T t) {
long now = scheduler.now();
if (nextWindowStartTime == UNSET) {
nextWindowStartTime = now + windowDurationMs;
subscriber.onNext(t);
} else if (now >= nextWindowStartTime) {
// ensure that we advance the next window start time to just
// beyond now
long n = (now - nextWindowStartTime) / windowDurationMs + 1;
nextWindowStartTime += n * windowDurationMs;
subscriber.onNext(t);
}
}
内容来源于网络,如有侵权,请联系作者删除!