本文整理了Java中rx.Observable.interval()
方法的一些代码示例,展示了Observable.interval()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.interval()
方法的具体详情如下:
包路径:rx.Observable
类名称:Observable
方法名:interval
[英]Returns an Observable that emits a sequential number every specified interval of time.
Scheduler: interval operates by default on the computation Scheduler.
[中]返回一个可观测值,该值每隔指定的时间间隔发出一个序列号。
调度程序:默认情况下,interval在计算调度程序上运行。
代码示例来源:origin: Netflix/conductor
@VisibleForTesting
private OnSubscribe<Message> getOnSubscribe() {
return subscriber -> {
Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
interval.flatMap((Long x) -> {
List<Message> msgs = receiveMessages();
return Observable.from(msgs);
}).subscribe(subscriber::onNext, subscriber::onError);
};
}
}
代码示例来源:origin: Netflix/conductor
@VisibleForTesting
OnSubscribe<Message> getOnSubscribe() {
return subscriber -> {
Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
interval.flatMap((Long x)->{
List<Message> msgs = receiveMessages();
return Observable.from(msgs);
}).subscribe(subscriber::onNext, subscriber::onError);
};
}
代码示例来源:origin: Rukey7/MvpApp
/**
* 倒计时
* @param time
* @return
*/
public static Observable<Integer> countdown(int time) {
if (time < 0) {
time = 0;
}
final int countTime = time;
return Observable.interval(0, 1, TimeUnit.SECONDS)
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long increaseTime) {
return countTime - increaseTime.intValue();
}
})
.take(countTime + 1)
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread());
}
}
代码示例来源:origin: Netflix/conductor
Observable<Long> interval = Observable.interval(100, TimeUnit.MILLISECONDS);
interval.flatMap((Long x) -> {
List<Message> available = new LinkedList<>();
代码示例来源:origin: bluelinelabs/Conductor
@NonNull
@Override
protected View onCreateView(@NonNull LayoutInflater inflater, @NonNull ViewGroup container) {
Log.i(TAG, "onCreateView() called");
View view = inflater.inflate(R.layout.controller_lifecycle, container, false);
view.setBackgroundColor(ContextCompat.getColor(container.getContext(), R.color.red_300));
unbinder = ButterKnife.bind(this, view);
tvTitle.setText(getResources().getString(R.string.rxlifecycle_title, TAG));
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing from onCreateView)");
}
})
.compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY_VIEW))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onCreateView(), running until onDestroyView(): " + num);
}
});
return view;
}
代码示例来源:origin: bluelinelabs/Conductor
public RxLifecycleController() {
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing from constructor");
}
})
.compose(this.<Long>bindUntilEvent(ControllerEvent.DESTROY))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in constructor, running until onDestroy(): " + num);
}
});
}
代码示例来源:origin: PipelineAI/pipeline
/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixUtilizationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allUtilizationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllUtilization)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: PipelineAI/pipeline
private HystrixDashboardStream(int delayInMs) {
this.delayInMs = delayInMs;
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
.map(new Func1<Long, DashboardData>() {
@Override
public DashboardData call(Long timestamp) {
return new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
代码示例来源:origin: Netflix/servo
@Test
public void testSendAllSlow() throws Exception {
Observable<Integer> interval = Observable.interval(400,
TimeUnit.MILLISECONDS).map(l -> l.intValue() + 1);
// now add an observable that should timeout
List<Observable<Integer>> batches = new ArrayList<>();
batches.add(interval);
int expectedSum = 3; // 1 + 2 should have been received from interval
for (int i = 1; i <= 5; ++i) {
batches.add(Observable.just(i));
expectedSum += i;
}
HttpHelper httpHelper = new HttpHelper(null);
int partial = httpHelper.sendAll(batches, expectedSum, 1000L);
assertEquals(partial, expectedSum);
}
代码示例来源:origin: bluelinelabs/Conductor
@Override
protected void onAttach(@NonNull View view) {
super.onAttach(view);
Log.i(TAG, "onAttach() called");
(((ActionBarProvider)getActivity()).getSupportActionBar()).setTitle("RxLifecycle Demo");
Observable.interval(1, TimeUnit.SECONDS)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.i(TAG, "Unsubscribing from onAttach()");
}
})
.compose(this.<Long>bindUntilEvent(ControllerEvent.DETACH))
.subscribe(new Action1<Long>() {
@Override
public void call(Long num) {
Log.i(TAG, "Started in onAttach(), running until onDetach(): " + num);
}
});
}
代码示例来源:origin: jhusain/learnrxjava
public static void main(String... args) {
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
return d + " " + t;
}).toBlocking().forEach(System.out::println);
}
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
/**
* This method will only be called once when the retained Fragment is first created.
*/
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// Retain this fragment across configuration changes.
setRetainInstance(true);
_storedIntsSubscription =//
Observable.interval(1, TimeUnit.SECONDS)//
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long aLong) {
return aLong.intValue();
}
})//
.take(20)//
.subscribe(_intStream);
}
代码示例来源:origin: cn-ljb/rxjava_for_android
private void autoLoop() {
if (subscribe_auto == null || subscribe_auto.isUnsubscribed()) {
subscribe_auto = Observable.interval(3000, 3000, TimeUnit.MILLISECONDS)
//延时3000 ,每间隔3000,时间单位
.compose(this.<Long>bindToLifecycle())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
int currentIndex = mViewPager.getCurrentItem();
if (++currentIndex == loopAdapter.getCount()) {
mViewPager.setCurrentItem(0);
} else {
mViewPager.setCurrentItem(currentIndex, true);
}
}
});
}
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
@OnClick(R.id.btn_demo_timing_4)
public void Btn4_RunTask5Times_IntervalOf3s() {
_log(String.format("D4 [%s] --- BTN click", _getCurrentTimestamp()));
Observable//
.interval(3, TimeUnit.SECONDS).take(5)//
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
_log(String.format("D4 [%s] XXX COMPLETE", _getCurrentTimestamp()));
}
@Override
public void onError(Throwable e) {
Timber.e(e, "something went wrong in TimingDemoFragment example");
}
@Override
public void onNext(Long number) {
_log(String.format("D4 [%s] NEXT", _getCurrentTimestamp()));
}
});
}
代码示例来源:origin: jhusain/learnrxjava
public static void main(String... args) {
TestScheduler test = Schedulers.test();
TestSubscriber<String> ts = new TestSubscriber<>();
Observable.interval(200, TimeUnit.MILLISECONDS, test)
.map(i -> {
return i + " value";
}).subscribe(ts);
test.advanceTimeBy(200, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Arrays.asList("0 value"));
test.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Arrays.asList("0 value", "1 value", "2 value", "3 value", "4 value"));
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
@OnClick(R.id.btn_demo_timing_2)
public void Btn2_RunTask_IntervalOf1s() {
if (_subscription1 != null && !_subscription1.isUnsubscribed()) {
_subscription1.unsubscribe();
_log(String.format("B2 [%s] XXX BTN KILLED", _getCurrentTimestamp()));
return;
}
_log(String.format("B2 [%s] --- BTN click", _getCurrentTimestamp()));
_subscription1 = Observable//
.interval(1, TimeUnit.SECONDS)//
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
_log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp()));
}
@Override
public void onError(Throwable e) {
Timber.e(e, "something went wrong in TimingDemoFragment example");
}
@Override
public void onNext(Long number) {
_log(String.format("B2 [%s] NEXT", _getCurrentTimestamp()));
}
});
}
代码示例来源:origin: THEONE10211024/RxJavaSamples
/**
* This method will only be called once when the retained Fragment is first created.
*/
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
// Retain this fragment across configuration changes.
setRetainInstance(true);
if (_storedIntsObservable != null) {
return;
}
Observable<Integer> intsObservable =//
Observable.interval(1, TimeUnit.SECONDS)//
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long aLong) {
return aLong.intValue();
}
})//
.take(20);
// -----------------------------------------------------------------------------------
// Making our observable "HOT" for the purpose of the demo.
//_intsObservable = _intsObservable.share();
_storedIntsObservable = intsObservable.replay();
_storedIntsSubscription = _storedIntsObservable.connect();
// Do not do this in production!
// `.share` is "warm" not "hot"
// the below forceful subscription fakes the heat
//_intsObservable.subscribe();
}
代码示例来源:origin: stephanenicolas/toothpick
@Inject
public RxPresenter() {
timeObservable = Observable.interval(1, TimeUnit.SECONDS) //
.subscribeOn(Schedulers.newThread()) //
.observeOn(AndroidSchedulers.mainThread()) //
.publish();
connect = timeObservable.connect();
}
代码示例来源:origin: MaksTuev/ferro
private void imitateDownloading(String bookId) {
List<Integer> percents = Arrays.asList(5, 17, 33, 50, 66, 81, 92, 100);
Observable.zip(
Observable.interval(600, TimeUnit.MILLISECONDS),
Observable.from(percents),
(t, percent) -> percent)
.doOnNext(percent -> updateBook(bookId, percent))
.subscribe();
}
内容来源于网络,如有侵权,请联系作者删除!