c++ RxCpp:如何编写自定义调度程序

e3bfsja2  于 2023-10-20  发布在  其他
关注(0)|答案(1)|浏览(200)

我想写一个自定义的RxCpp调度程序,它在我的应用程序的事件队列上执行操作。为此,我正在学习相关的接口。
我下面的简单代码由于SIGSEGV而失败--当逐步执行代码时,看起来RxCpp代码确实是在解引用空指针。什么是正确的模式?

#include <rxcpp/rx.hpp>
#include <iostream>

class simple_worker : public rxcpp::schedulers::worker_interface {
public:
  clock_type::time_point now() const override {
    return clock_type::now();
  }

  void schedule(const rxcpp::schedulers::schedulable& scbl) const override {
    scbl();
  }

  void schedule(clock_type::time_point when, const rxcpp::schedulers::schedulable& scbl) const override {
    // TODO: Cheating, not using "when".
    scbl();
  }
};

class simple_service : public rxcpp::schedulers::scheduler_interface {
public:
  virtual clock_type::time_point now() const override {
    return clock_type::now();
  }

  virtual rxcpp::schedulers::worker create_worker(rxcpp::composite_subscription cs) const override {
    return rxcpp::schedulers::worker(std::move(cs), std::make_shared<simple_worker>());
  }
};

void testSubjects() {
  rxcpp::subjects::subject<int> s;

  auto threads = rxcpp::observe_on_one_worker(rxcpp::schedulers::make_scheduler<simple_service>());

  s.get_observable().observe_on(threads) |
  rxcpp::operators::subscribe<int>(
      [](int j) {
        std::cout << "  observer received " << j << std::endl;
      },
      [](const std::exception_ptr&) {
        std::cout << "  observer received error" << std::endl;
      },
      []() {
        std::cout << "  observer received complete" << std::endl;
      });

  for (int i = 0; i < 100; i++) {
    s.get_subscriber().on_next(i);
  }
}

int main() {
  testSubjects();
}
yqhsw0fo

yqhsw0fo1#

Rxcpp使用recursor在调度程序中进行递归调用。你需要像这样调用可扩展的东西:

if (scbl.is_subscribed()) {
                // allow recursion
                recursion r(true);
                scbl(r.get_recurse());
            }

这是如何立即实施:https://github.com/ReactiveX/RxCpp/blob/main/Rx/v2/src/rxcpp/schedulers/rx-immediate.hpp#L40

相关问题