rx.Observable.interval()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(11.1k)|赞(0)|评价(0)|浏览(258)

本文整理了Java中rx.Observable.interval()方法的一些代码示例,展示了Observable.interval()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.interval()方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:interval

Observable.interval介绍

[英]Returns an Observable that emits a sequential number every specified interval of time.

Scheduler: interval operates by default on the computation Scheduler.
[中]返回一个可观测值,该值每隔指定的时间间隔发出一个序列号。
调度程序:默认情况下,interval在计算调度程序上运行。

代码示例

代码示例来源:origin: Netflix/conductor

  1. @VisibleForTesting
  2. private OnSubscribe<Message> getOnSubscribe() {
  3. return subscriber -> {
  4. Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
  5. interval.flatMap((Long x) -> {
  6. List<Message> msgs = receiveMessages();
  7. return Observable.from(msgs);
  8. }).subscribe(subscriber::onNext, subscriber::onError);
  9. };
  10. }
  11. }

代码示例来源:origin: Netflix/conductor

  1. @VisibleForTesting
  2. OnSubscribe<Message> getOnSubscribe() {
  3. return subscriber -> {
  4. Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
  5. interval.flatMap((Long x)->{
  6. List<Message> msgs = receiveMessages();
  7. return Observable.from(msgs);
  8. }).subscribe(subscriber::onNext, subscriber::onError);
  9. };
  10. }

代码示例来源:origin: Rukey7/MvpApp

  1. /**
  2. * 倒计时
  3. * @param time
  4. * @return
  5. */
  6. public static Observable<Integer> countdown(int time) {
  7. if (time < 0) {
  8. time = 0;
  9. }
  10. final int countTime = time;
  11. return Observable.interval(0, 1, TimeUnit.SECONDS)
  12. .map(new Func1<Long, Integer>() {
  13. @Override
  14. public Integer call(Long increaseTime) {
  15. return countTime - increaseTime.intValue();
  16. }
  17. })
  18. .take(countTime + 1)
  19. .subscribeOn(Schedulers.io())
  20. .unsubscribeOn(Schedulers.io())
  21. .subscribeOn(AndroidSchedulers.mainThread())
  22. .observeOn(AndroidSchedulers.mainThread());
  23. }
  24. }

代码示例来源:origin: Netflix/conductor

  1. Observable<Long> interval = Observable.interval(100, TimeUnit.MILLISECONDS);
  2. interval.flatMap((Long x) -> {
  3. List<Message> available = new LinkedList<>();

代码示例来源:origin: bluelinelabs/Conductor

  1. @NonNull
  2. @Override
  3. protected View onCreateView(@NonNull LayoutInflater inflater, @NonNull ViewGroup container) {
  4. Log.i(TAG, "onCreateView() called");
  5. View view = inflater.inflate(R.layout.controller_lifecycle, container, false);
  6. view.setBackgroundColor(ContextCompat.getColor(container.getContext(), R.color.red_300));
  7. unbinder = ButterKnife.bind(this, view);
  8. tvTitle.setText(getResources().getString(R.string.rxlifecycle_title, TAG));
  9. Observable.interval(1, TimeUnit.SECONDS)
  10. .doOnUnsubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. Log.i(TAG, "Unsubscribing from onCreateView)");
  14. }
  15. })
  16. .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY_VIEW))
  17. .subscribe(new Action1<Long>() {
  18. @Override
  19. public void call(Long num) {
  20. Log.i(TAG, "Started in onCreateView(), running until onDestroyView(): " + num);
  21. }
  22. });
  23. return view;
  24. }

代码示例来源:origin: bluelinelabs/Conductor

  1. public RxLifecycleController() {
  2. Observable.interval(1, TimeUnit.SECONDS)
  3. .doOnUnsubscribe(new Action0() {
  4. @Override
  5. public void call() {
  6. Log.i(TAG, "Unsubscribing from constructor");
  7. }
  8. })
  9. .compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY))
  10. .subscribe(new Action1<Long>() {
  11. @Override
  12. public void call(Long num) {
  13. Log.i(TAG, "Started in constructor, running until onDestroy(): " + num);
  14. }
  15. });
  16. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixUtilizationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllUtilization)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. /**
  2. * @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
  3. * @param intervalInMilliseconds milliseconds between data emissions
  4. */
  5. @Deprecated //deprecated in 1.5.4.
  6. public HystrixConfigurationStream(final int intervalInMilliseconds) {
  7. this.intervalInMilliseconds = intervalInMilliseconds;
  8. this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
  9. .map(getAllConfig)
  10. .doOnSubscribe(new Action0() {
  11. @Override
  12. public void call() {
  13. isSourceCurrentlySubscribed.set(true);
  14. }
  15. })
  16. .doOnUnsubscribe(new Action0() {
  17. @Override
  18. public void call() {
  19. isSourceCurrentlySubscribed.set(false);
  20. }
  21. })
  22. .share()
  23. .onBackpressureDrop();
  24. }

代码示例来源:origin: PipelineAI/pipeline

  1. private HystrixDashboardStream(int delayInMs) {
  2. this.delayInMs = delayInMs;
  3. this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
  4. .map(new Func1<Long, DashboardData>() {
  5. @Override
  6. public DashboardData call(Long timestamp) {
  7. return new DashboardData(
  8. HystrixCommandMetrics.getInstances(),
  9. HystrixThreadPoolMetrics.getInstances(),
  10. HystrixCollapserMetrics.getInstances()
  11. );
  12. }
  13. })
  14. .doOnSubscribe(new Action0() {
  15. @Override
  16. public void call() {
  17. isSourceCurrentlySubscribed.set(true);
  18. }
  19. })
  20. .doOnUnsubscribe(new Action0() {
  21. @Override
  22. public void call() {
  23. isSourceCurrentlySubscribed.set(false);
  24. }
  25. })
  26. .share()
  27. .onBackpressureDrop();
  28. }

代码示例来源:origin: Netflix/servo

  1. @Test
  2. public void testSendAllSlow() throws Exception {
  3. Observable<Integer> interval = Observable.interval(400,
  4. TimeUnit.MILLISECONDS).map(l -> l.intValue() + 1);
  5. // now add an observable that should timeout
  6. List<Observable<Integer>> batches = new ArrayList<>();
  7. batches.add(interval);
  8. int expectedSum = 3; // 1 + 2 should have been received from interval
  9. for (int i = 1; i <= 5; ++i) {
  10. batches.add(Observable.just(i));
  11. expectedSum += i;
  12. }
  13. HttpHelper httpHelper = new HttpHelper(null);
  14. int partial = httpHelper.sendAll(batches, expectedSum, 1000L);
  15. assertEquals(partial, expectedSum);
  16. }

代码示例来源:origin: bluelinelabs/Conductor

  1. @Override
  2. protected void onAttach(@NonNull View view) {
  3. super.onAttach(view);
  4. Log.i(TAG, "onAttach() called");
  5. (((ActionBarProvider)getActivity()).getSupportActionBar()).setTitle("RxLifecycle Demo");
  6. Observable.interval(1, TimeUnit.SECONDS)
  7. .doOnUnsubscribe(new Action0() {
  8. @Override
  9. public void call() {
  10. Log.i(TAG, "Unsubscribing from onAttach()");
  11. }
  12. })
  13. .compose(this.<Long>bindUntilEvent(ControllerEvent.DETACH))
  14. .subscribe(new Action1<Long>() {
  15. @Override
  16. public void call(Long num) {
  17. Log.i(TAG, "Started in onAttach(), running until onDetach(): " + num);
  18. }
  19. });
  20. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String... args) {
  2. Observable<String> data = Observable.just("one", "two", "three", "four", "five");
  3. Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
  4. return d + " " + t;
  5. }).toBlocking().forEach(System.out::println);
  6. }
  7. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. /**
  2. * This method will only be called once when the retained Fragment is first created.
  3. */
  4. @Override
  5. public void onCreate(Bundle savedInstanceState) {
  6. super.onCreate(savedInstanceState);
  7. // Retain this fragment across configuration changes.
  8. setRetainInstance(true);
  9. _storedIntsSubscription =//
  10. Observable.interval(1, TimeUnit.SECONDS)//
  11. .map(new Func1<Long, Integer>() {
  12. @Override
  13. public Integer call(Long aLong) {
  14. return aLong.intValue();
  15. }
  16. })//
  17. .take(20)//
  18. .subscribe(_intStream);
  19. }

代码示例来源:origin: cn-ljb/rxjava_for_android

  1. private void autoLoop() {
  2. if (subscribe_auto == null || subscribe_auto.isUnsubscribed()) {
  3. subscribe_auto = Observable.interval(3000, 3000, TimeUnit.MILLISECONDS)
  4. //延时3000 ,每间隔3000,时间单位
  5. .compose(this.<Long>bindToLifecycle())
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .subscribe(new Action1<Long>() {
  8. @Override
  9. public void call(Long aLong) {
  10. int currentIndex = mViewPager.getCurrentItem();
  11. if (++currentIndex == loopAdapter.getCount()) {
  12. mViewPager.setCurrentItem(0);
  13. } else {
  14. mViewPager.setCurrentItem(currentIndex, true);
  15. }
  16. }
  17. });
  18. }
  19. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. @OnClick(R.id.btn_demo_timing_4)
  2. public void Btn4_RunTask5Times_IntervalOf3s() {
  3. _log(String.format("D4 [%s] --- BTN click", _getCurrentTimestamp()));
  4. Observable//
  5. .interval(3, TimeUnit.SECONDS).take(5)//
  6. .subscribe(new Observer<Long>() {
  7. @Override
  8. public void onCompleted() {
  9. _log(String.format("D4 [%s] XXX COMPLETE", _getCurrentTimestamp()));
  10. }
  11. @Override
  12. public void onError(Throwable e) {
  13. Timber.e(e, "something went wrong in TimingDemoFragment example");
  14. }
  15. @Override
  16. public void onNext(Long number) {
  17. _log(String.format("D4 [%s] NEXT", _getCurrentTimestamp()));
  18. }
  19. });
  20. }

代码示例来源:origin: jhusain/learnrxjava

  1. public static void main(String... args) {
  2. TestScheduler test = Schedulers.test();
  3. TestSubscriber<String> ts = new TestSubscriber<>();
  4. Observable.interval(200, TimeUnit.MILLISECONDS, test)
  5. .map(i -> {
  6. return i + " value";
  7. }).subscribe(ts);
  8. test.advanceTimeBy(200, TimeUnit.MILLISECONDS);
  9. ts.assertReceivedOnNext(Arrays.asList("0 value"));
  10. test.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
  11. ts.assertReceivedOnNext(Arrays.asList("0 value", "1 value", "2 value", "3 value", "4 value"));
  12. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. @OnClick(R.id.btn_demo_timing_2)
  2. public void Btn2_RunTask_IntervalOf1s() {
  3. if (_subscription1 != null && !_subscription1.isUnsubscribed()) {
  4. _subscription1.unsubscribe();
  5. _log(String.format("B2 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
  6. return;
  7. }
  8. _log(String.format("B2 [%s] --- BTN click", _getCurrentTimestamp()));
  9. _subscription1 = Observable//
  10. .interval(1, TimeUnit.SECONDS)//
  11. .subscribe(new Observer<Long>() {
  12. @Override
  13. public void onCompleted() {
  14. _log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
  15. }
  16. @Override
  17. public void onError(Throwable e) {
  18. Timber.e(e, "something went wrong in TimingDemoFragment example");
  19. }
  20. @Override
  21. public void onNext(Long number) {
  22. _log(String.format("B2 [%s] NEXT", _getCurrentTimestamp()));
  23. }
  24. });
  25. }

代码示例来源:origin: THEONE10211024/RxJavaSamples

  1. /**
  2. * This method will only be called once when the retained Fragment is first created.
  3. */
  4. @Override
  5. public void onCreate(Bundle savedInstanceState) {
  6. super.onCreate(savedInstanceState);
  7. // Retain this fragment across configuration changes.
  8. setRetainInstance(true);
  9. if (_storedIntsObservable != null) {
  10. return;
  11. }
  12. Observable<Integer> intsObservable =//
  13. Observable.interval(1, TimeUnit.SECONDS)//
  14. .map(new Func1<Long, Integer>() {
  15. @Override
  16. public Integer call(Long aLong) {
  17. return aLong.intValue();
  18. }
  19. })//
  20. .take(20);
  21. // -----------------------------------------------------------------------------------
  22. // Making our observable "HOT" for the purpose of the demo.
  23. //_intsObservable = _intsObservable.share();
  24. _storedIntsObservable = intsObservable.replay();
  25. _storedIntsSubscription = _storedIntsObservable.connect();
  26. // Do not do this in production!
  27. // `.share` is "warm" not "hot"
  28. // the below forceful subscription fakes the heat
  29. //_intsObservable.subscribe();
  30. }

代码示例来源:origin: stephanenicolas/toothpick

  1. @Inject
  2. public RxPresenter() {
  3. timeObservable = Observable.interval(1, TimeUnit.SECONDS) //
  4. .subscribeOn(Schedulers.newThread()) //
  5. .observeOn(AndroidSchedulers.mainThread()) //
  6. .publish();
  7. connect = timeObservable.connect();
  8. }

代码示例来源:origin: MaksTuev/ferro

  1. private void imitateDownloading(String bookId) {
  2. List<Integer> percents = Arrays.asList(5, 17, 33, 50, 66, 81, 92, 100);
  3. Observable.zip(
  4. Observable.interval(600, TimeUnit.MILLISECONDS),
  5. Observable.from(percents),
  6. (t, percent) -> percent)
  7. .doOnNext(percent -> updateBook(bookId, percent))
  8. .subscribe();
  9. }

相关文章

Observable类方法