rx.Observable.interval()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(228)

本文整理了Java中rx.Observable.interval()方法的一些代码示例,展示了Observable.interval()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.interval()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:interval

Observable.interval介绍

[英]Returns an Observable that emits a sequential number every specified interval of time.

Scheduler: interval operates by default on the computation Scheduler.
[中]返回一个可观测值,该值每隔指定的时间间隔发出一个序列号。
调度程序:默认情况下,interval在计算调度程序上运行。

代码示例

代码示例来源:origin: Netflix/conductor

@VisibleForTesting
  private OnSubscribe<Message> getOnSubscribe() {
    return subscriber -> {
      Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
      interval.flatMap((Long x) -> {
        List<Message> msgs = receiveMessages();
        return Observable.from(msgs);
      }).subscribe(subscriber::onNext, subscriber::onError);
    };
  }
}

代码示例来源:origin: Netflix/conductor

@VisibleForTesting
OnSubscribe<Message> getOnSubscribe() {
  return subscriber -> {
    Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
    interval.flatMap((Long x)->{
      List<Message> msgs = receiveMessages();
      return Observable.from(msgs);
    }).subscribe(subscriber::onNext, subscriber::onError);
  };
}

代码示例来源:origin: Rukey7/MvpApp

/**
   * 倒计时
   * @param time
   * @return
   */
  public static Observable<Integer> countdown(int time) {
    if (time < 0) {
      time = 0;
    }
    final int countTime = time;

    return Observable.interval(0, 1, TimeUnit.SECONDS)
        .map(new Func1<Long, Integer>() {
          @Override
          public Integer call(Long increaseTime) {
            return countTime - increaseTime.intValue();
          }
        })
        .take(countTime + 1)
        .subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread());

  }
}

代码示例来源:origin: Netflix/conductor

Observable<Long> interval = Observable.interval(100, TimeUnit.MILLISECONDS);
interval.flatMap((Long x) -> {
  List<Message> available = new LinkedList<>();

代码示例来源:origin: bluelinelabs/Conductor

@NonNull
@Override
protected View onCreateView(@NonNull LayoutInflater inflater, @NonNull ViewGroup container) {
  Log.i(TAG, "onCreateView() called");
  View view = inflater.inflate(R.layout.controller_lifecycle, container, false);
  view.setBackgroundColor(ContextCompat.getColor(container.getContext(), R.color.red_300));
  unbinder = ButterKnife.bind(this, view);
  tvTitle.setText(getResources().getString(R.string.rxlifecycle_title, TAG));
  Observable.interval(1, TimeUnit.SECONDS)
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          Log.i(TAG, "Unsubscribing from onCreateView)");
        }
      })
      .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY_VIEW))
      .subscribe(new Action1<Long>() {
        @Override
        public void call(Long num) {
          Log.i(TAG, "Started in onCreateView(), running until onDestroyView(): " + num);
        }
      });
  return view;
}

代码示例来源:origin: bluelinelabs/Conductor

public RxLifecycleController() {
  Observable.interval(1, TimeUnit.SECONDS)
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          Log.i(TAG, "Unsubscribing from constructor");
        }
      })
      .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY))
      .subscribe(new Action1<Long>() {
        @Override
        public void call(Long num) {
          Log.i(TAG, "Started in constructor, running until onDestroy(): " + num);
        }
      });
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllUtilization)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

/**
 * @deprecated Not for public use.  Please use {@link #getInstance()}.  This facilitates better stream-sharing
 * @param intervalInMilliseconds milliseconds between data emissions
 */
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
  this.intervalInMilliseconds = intervalInMilliseconds;
  this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
      .map(getAllConfig)
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: PipelineAI/pipeline

private HystrixDashboardStream(int delayInMs) {
  this.delayInMs = delayInMs;
  this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
      .map(new Func1<Long, DashboardData>() {
        @Override
        public DashboardData call(Long timestamp) {
          return new DashboardData(
              HystrixCommandMetrics.getInstances(),
              HystrixThreadPoolMetrics.getInstances(),
              HystrixCollapserMetrics.getInstances()
          );
        }
      })
      .doOnSubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(true);
        }
      })
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          isSourceCurrentlySubscribed.set(false);
        }
      })
      .share()
      .onBackpressureDrop();
}

代码示例来源:origin: Netflix/servo

@Test
public void testSendAllSlow() throws Exception {
 Observable<Integer> interval = Observable.interval(400,
   TimeUnit.MILLISECONDS).map(l -> l.intValue() + 1);
 // now add an observable that should timeout
 List<Observable<Integer>> batches = new ArrayList<>();
 batches.add(interval);
 int expectedSum = 3; // 1 + 2 should have been received from interval
 for (int i = 1; i <= 5; ++i) {
  batches.add(Observable.just(i));
  expectedSum += i;
 }
 HttpHelper httpHelper = new HttpHelper(null);
 int partial = httpHelper.sendAll(batches, expectedSum, 1000L);
 assertEquals(partial, expectedSum);
}

代码示例来源:origin: bluelinelabs/Conductor

@Override
protected void onAttach(@NonNull View view) {
  super.onAttach(view);
  Log.i(TAG, "onAttach() called");
  (((ActionBarProvider)getActivity()).getSupportActionBar()).setTitle("RxLifecycle Demo");
  Observable.interval(1, TimeUnit.SECONDS)
      .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
          Log.i(TAG, "Unsubscribing from onAttach()");
        }
      })
      .compose(this.<Long>bindUntilEvent(ControllerEvent.DETACH))
      .subscribe(new Action1<Long>() {
        @Override
        public void call(Long num) {
          Log.i(TAG, "Started in onAttach(), running until onDetach(): " + num);
        }
      });
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {
    Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
      return d + " " + t;
    }).toBlocking().forEach(System.out::println);
    
  }
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

/**
 * This method will only be called once when the retained Fragment is first created.
 */
@Override
public void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  // Retain this fragment across configuration changes.
  setRetainInstance(true);
  _storedIntsSubscription =//
     Observable.interval(1, TimeUnit.SECONDS)//
        .map(new Func1<Long, Integer>() {
          @Override
          public Integer call(Long aLong) {
            return aLong.intValue();
          }
        })//
        .take(20)//
        .subscribe(_intStream);
}

代码示例来源:origin: cn-ljb/rxjava_for_android

private void autoLoop() {
  if (subscribe_auto == null || subscribe_auto.isUnsubscribed()) {
    subscribe_auto = Observable.interval(3000, 3000, TimeUnit.MILLISECONDS)
                    //延时3000 ,每间隔3000,时间单位
        .compose(this.<Long>bindToLifecycle())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Long>() {
          @Override
          public void call(Long aLong) {
            int currentIndex = mViewPager.getCurrentItem();
            if (++currentIndex == loopAdapter.getCount()) {
              mViewPager.setCurrentItem(0);
            } else {
              mViewPager.setCurrentItem(currentIndex, true);
            }
          }
        });
  }
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@OnClick(R.id.btn_demo_timing_4)
public void Btn4_RunTask5Times_IntervalOf3s() {
  _log(String.format("D4 [%s] --- BTN click", _getCurrentTimestamp()));
  Observable//
     .interval(3, TimeUnit.SECONDS).take(5)//
     .subscribe(new Observer<Long>() {
       @Override
       public void onCompleted() {
         _log(String.format("D4 [%s] XXX COMPLETE", _getCurrentTimestamp()));
       }
       @Override
       public void onError(Throwable e) {
         Timber.e(e, "something went wrong in TimingDemoFragment example");
       }
       @Override
       public void onNext(Long number) {
         _log(String.format("D4 [%s]     NEXT", _getCurrentTimestamp()));
       }
     });
}

代码示例来源:origin: jhusain/learnrxjava

public static void main(String... args) {
  TestScheduler test = Schedulers.test();
  TestSubscriber<String> ts = new TestSubscriber<>();
  Observable.interval(200, TimeUnit.MILLISECONDS, test)
      .map(i -> {
        return i + " value";
      }).subscribe(ts);
  test.advanceTimeBy(200, TimeUnit.MILLISECONDS);
  ts.assertReceivedOnNext(Arrays.asList("0 value"));
  test.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  ts.assertReceivedOnNext(Arrays.asList("0 value", "1 value", "2 value", "3 value", "4 value"));
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

@OnClick(R.id.btn_demo_timing_2)
public void Btn2_RunTask_IntervalOf1s() {
  if (_subscription1 != null && !_subscription1.isUnsubscribed()) {
    _subscription1.unsubscribe();
    _log(String.format("B2 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
    return;
  }
  _log(String.format("B2 [%s] --- BTN click", _getCurrentTimestamp()));
  _subscription1 = Observable//
     .interval(1, TimeUnit.SECONDS)//
     .subscribe(new Observer<Long>() {
       @Override
       public void onCompleted() {
         _log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
       }
       @Override
       public void onError(Throwable e) {
         Timber.e(e, "something went wrong in TimingDemoFragment example");
       }
       @Override
       public void onNext(Long number) {
         _log(String.format("B2 [%s]     NEXT", _getCurrentTimestamp()));
       }
     });
}

代码示例来源:origin: THEONE10211024/RxJavaSamples

/**
 * This method will only be called once when the retained Fragment is first created.
 */
@Override
public void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  // Retain this fragment across configuration changes.
  setRetainInstance(true);
  if (_storedIntsObservable != null) {
    return;
  }
  Observable<Integer> intsObservable =//
     Observable.interval(1, TimeUnit.SECONDS)//
        .map(new Func1<Long, Integer>() {
          @Override
          public Integer call(Long aLong) {
            return aLong.intValue();
          }
        })//
        .take(20);
  // -----------------------------------------------------------------------------------
  // Making our observable "HOT" for the purpose of the demo.
  //_intsObservable = _intsObservable.share();
  _storedIntsObservable = intsObservable.replay();
  _storedIntsSubscription = _storedIntsObservable.connect();
  // Do not do this in production!
  // `.share` is "warm" not "hot"
  // the below forceful subscription fakes the heat
  //_intsObservable.subscribe();
}

代码示例来源:origin: stephanenicolas/toothpick

@Inject
public RxPresenter() {
 timeObservable = Observable.interval(1, TimeUnit.SECONDS) //
   .subscribeOn(Schedulers.newThread()) //
   .observeOn(AndroidSchedulers.mainThread()) //
   .publish();
 connect = timeObservable.connect();
}

代码示例来源:origin: MaksTuev/ferro

private void imitateDownloading(String bookId) {
  List<Integer> percents = Arrays.asList(5, 17, 33, 50, 66, 81, 92, 100);
  Observable.zip(
      Observable.interval(600, TimeUnit.MILLISECONDS),
      Observable.from(percents),
      (t, percent) -> percent)
      .doOnNext(percent -> updateBook(bookId, percent))
      .subscribe();
}

相关文章

Observable类方法