我想写一个自定义的RxCpp调度程序,它在我的应用程序的事件队列上执行操作。为此,我正在学习相关的接口。
我下面的简单代码由于SIGSEGV
而失败--当逐步执行代码时,看起来RxCpp代码确实是在解引用空指针。什么是正确的模式?
#include <rxcpp/rx.hpp>
#include <iostream>
class simple_worker : public rxcpp::schedulers::worker_interface {
public:
clock_type::time_point now() const override {
return clock_type::now();
}
void schedule(const rxcpp::schedulers::schedulable& scbl) const override {
scbl();
}
void schedule(clock_type::time_point when, const rxcpp::schedulers::schedulable& scbl) const override {
// TODO: Cheating, not using "when".
scbl();
}
};
class simple_service : public rxcpp::schedulers::scheduler_interface {
public:
virtual clock_type::time_point now() const override {
return clock_type::now();
}
virtual rxcpp::schedulers::worker create_worker(rxcpp::composite_subscription cs) const override {
return rxcpp::schedulers::worker(std::move(cs), std::make_shared<simple_worker>());
}
};
void testSubjects() {
rxcpp::subjects::subject<int> s;
auto threads = rxcpp::observe_on_one_worker(rxcpp::schedulers::make_scheduler<simple_service>());
s.get_observable().observe_on(threads) |
rxcpp::operators::subscribe<int>(
[](int j) {
std::cout << " observer received " << j << std::endl;
},
[](const std::exception_ptr&) {
std::cout << " observer received error" << std::endl;
},
[]() {
std::cout << " observer received complete" << std::endl;
});
for (int i = 0; i < 100; i++) {
s.get_subscriber().on_next(i);
}
}
int main() {
testSubjects();
}
1条答案
按热度按时间yqhsw0fo1#
Rxcpp使用
recursor
在调度程序中进行递归调用。你需要像这样调用可扩展的东西:这是如何立即实施:https://github.com/ReactiveX/RxCpp/blob/main/Rx/v2/src/rxcpp/schedulers/rx-immediate.hpp#L40