rx.Scheduler类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(7.1k)|赞(0)|评价(0)|浏览(126)

本文整理了Java中rx.Scheduler类的一些代码示例,展示了Scheduler类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Scheduler类的具体详情如下:
包路径:rx.Scheduler
类名称:Scheduler

Scheduler介绍

[英]Represents an object that schedules units of work.

Common implementations can be found in Schedulers.

Why is this an abstract class instead of an interface?

  1. Java doesn't support extension methods and there are many overload methods needing default implementations.
  2. Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.
  3. If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the functionality.
  4. Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.
    [中]表示调度工作单元的对象。
    常见的实现可以在调度器中找到。
    为什么这是一个抽象类而不是接口?
    1.Java不支持扩展方法,有很多重载方法需要默认实现。
    1.虚拟扩展方法在Java8之前是不可用的,RxJava在很长一段时间内都不会将其设置为最低目标。
    1.如果只使用了一个接口,那么调度器实现将需要从提供所有功能的AbstractScheduler对扩展,除非它们打算复制/粘贴功能。
    1.如果没有虚拟扩展方法,即使添加的更改也会中断,从而严重阻碍库的维护。

代码示例

代码示例来源:origin: BaronZ88/MinimalistWeather

try {
    Weather weather = weatherDao.queryWeather(cityId);
    subscriber.onNext(weather);
    subscriber.onCompleted();
  } catch (SQLException e) {
    throw Exceptions.propagate(e);
observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork.doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> {
  try {
    weatherDao.insertOrUpdateWeather(weather);

代码示例来源:origin: akarnokd/RxJava2Interop

@Override
public long now(TimeUnit unit) {
  return unit.convert(source.now(), TimeUnit.MILLISECONDS);
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
public void call(final Subscriber<? super T> s) {
  final Worker worker = scheduler.createWorker();
  s.add(worker);
  worker.schedule(new Action0() {
    @Override
    public void call() {
      if (!s.isUnsubscribed()) {
        source.unsafeSubscribe(s);
      }
    }
  }, time, unit);
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
public void onNext(T v) {
  long now = scheduler.now();
  if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) {
    lastOnNext = now;
    subscriber.onNext(v);
  } 
}

代码示例来源:origin: PipelineAI/pipeline

@Override
public Worker createWorker() {
  return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

代码示例来源:origin: mp911de/spinach

if (subscriber.isUnsubscribed()) {
  return;
subscriber.onStart();
  Scheduler.Worker worker = scheduler.createWorker();
  Subscription subscription = worker.schedulePeriodically(getJobsAction, 0, 10, TimeUnit.MILLISECONDS);
  getJobsAction.setSelfSubscription(subscription);
    reconnectTrigger = worker.schedulePeriodically(new Action0() {
      @Override
      public void call() {
    log.debug("QueueListener.call caught an exception: {}", e.getMessage(), e);
  subscriber.onError(e);

代码示例来源:origin: colintheshots/RxJavaExamples

public static void main(String[] args) {
    Observable.create((Subscriber<? super Calendar> subscriber) -> Schedulers.immediate().createWorker()
        .schedulePeriodically(() ->
            subscriber.onNext(Calendar.getInstance(Locale.US)), INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS))
      .take(10)
      .observeOn(Schedulers.io())
      .subscribe(calendar -> {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMAT);
        String time = simpleDateFormat.format(calendar.getTime());
        System.out.println(time);
      }, Throwable::printStackTrace);

  }
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
  this.child = child;
  this.recursiveScheduler = scheduler.createWorker();
  this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler, queue);
  child.add(scheduledUnsubscribe);
  child.setProducer(new Producer() {
    @Override
    public void request(long n) {
      REQUESTED.getAndAdd(ObserveOnSubscriber.this, n);
      schedule();
    }
  });
  add(scheduledUnsubscribe);
  child.add(recursiveScheduler);
  child.add(this);
  
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
public void onCompleted() {
  emitItemsOutOfWindow(scheduler.now());
  subscriber.onCompleted();
}

代码示例来源:origin: hzsweers/RxNormalize

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
 final Scheduler.Worker worker = scheduler.createWorker();
 child.add(worker);
 return new Subscriber<T>(child) {
  private volatile long lastTimestamp = scheduler.now();
  private volatile long nextTimestamp = lastTimestamp;
  private AtomicBoolean isDraining = new AtomicBoolean();

代码示例来源:origin: com.netflix.eureka/eureka2-core

public BaseMessageConnection(
    String name,
    ObservableConnection<Object, Object> connection,
    MessageConnectionMetrics metrics,
    Scheduler expiryScheduler) {
  this.connection = connection;
  this.metrics = metrics;
  this.name = descriptiveName(name);
  schedulerWorker = expiryScheduler.createWorker();
  installAcknowledgementHandler();
  this.startTime = expiryScheduler.now();
  metrics.incrementConnectionCounter();
}

代码示例来源:origin: com.github.mrstampy/esp

@Override
public void triggerProcessing(final int numSamples) {
  connectionCheck();
  Schedulers.newThread().schedule(new Action1<Scheduler.Inner>() {
    @Override
    public void call(Inner t1) {
      process(getConnection().getCurrentFor(numSamples, getChannel()));
    }
  });
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
public void call(final Subscriber<? super Long> child) {
  Worker worker = scheduler.createWorker();
  child.add(worker);
  worker.schedule(new Action0() {
    @Override
    public void call() {
      try {
        child.onNext(0L);
      } catch (Throwable t) {
        child.onError(t);
        return;
      }
      child.onCompleted();
    }
  }, time, unit);
}

代码示例来源:origin: com.github.davidmoten/rxjava-extras

@Override
public void onNext(T t) {
  long now = scheduler.now();
  if (nextWindowStartTime == UNSET) {
    nextWindowStartTime = now + windowDurationMs;
    subscriber.onNext(t);
  } else if (now >= nextWindowStartTime) {
    // ensure that we advance the next window start time to just
    // beyond now
    long n = (now - nextWindowStartTime) / windowDurationMs + 1;
    nextWindowStartTime += n * windowDurationMs;
    subscriber.onNext(t);
  }
}

代码示例来源:origin: ReactiveX/RxNetty

writeWorker = Schedulers.computation().createWorker();

代码示例来源:origin: com.netflix.rxjava/rxjava-core

final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
final Scheduler.Worker worker = scheduler.createWorker();
child.add(worker);
child.add(sourceSubscriptions);
worker.schedule(new Action0() {
  @Override
  public void call() {
child.setProducer(new Producer() {

代码示例来源:origin: com.netflix.eureka/eureka2-test-utils

public void start() {
  final long startTime = scheduler.now();
  worker = scheduler.createWorker();
  scheduler.createWorker().schedule(new Action0() {
    @Override
    public void call() {

代码示例来源:origin: com.github.mrstampy/esp

@Override
public void triggerProcessing() {
  connectionCheck();
  Schedulers.newThread().schedule(new Action1<Scheduler.Inner>() {
    @Override
    public void call(Inner t1) {
      process(getConnection().getCurrentFor(getChannel()));
    }
  });
}

代码示例来源:origin: com.netflix.rxjava/rxjava-core

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
  final Worker inner = scheduler.createWorker();
  subscriber.add(inner);
  return new Subscriber<Observable<T>>(subscriber) {

代码示例来源:origin: davidmoten/rxjava-extras

@Override
public void onNext(T t) {
  long now = scheduler.now();
  if (nextWindowStartTime == UNSET) {
    nextWindowStartTime = now + windowDurationMs;
    subscriber.onNext(t);
  } else if (now >= nextWindowStartTime) {
    // ensure that we advance the next window start time to just
    // beyond now
    long n = (now - nextWindowStartTime) / windowDurationMs + 1;
    nextWindowStartTime += n * windowDurationMs;
    subscriber.onNext(t);
  }
}

相关文章