postgresql 未使用pqxx::notification_receiver接收通知

juud5qan  于 2023-06-22  发布在  PostgreSQL
关注(0)|答案(1)|浏览(147)

我已经设置了一个触发器,并使用pgadmin4(使用LISTEN)进行了测试,表的更改会得到通知。期望的是,一个Web界面可能会更改设置,我的后端代码必须拾取这些更改并应用。
我创建了一个小类(函数是内联的)来扩展通知接收器类。

class SettingsChangeListener: public pqxx::notification_receiver
{
public:
  SettingsChangeListener(pqxx::connection_base &c);
  virtual void operator()(const std::string &payload, int backend_pid)
  throw ();

  void RegisterParentData(SINT32 queueId, UINT32 eventID);

private:
  SINT32 m_parentQueueHandle; // handle to the parent's message queue
  UINT32 m_eventID; // event to send to the parent on notification
};

inline SettingsChangeListener::SettingsChangeListener(pqxx::connection_base &c)
   : pqxx::notification_receiver(c, "settings_changed"), m_parentQueueHandle(0), m_eventID(0)
{

}

inline void SettingsChangeListener::operator()(const std::string &payload, int backend_pid)
throw ()
{
   if (m_parentQueueHandle > 0)
   {
      LOG_SYSTEM_INFO("Detected Change in Settings\n");
      // don't need the notification text - notify parent that the data has changed
      CmnMessage msg;
      msg.SetMessageID(m_eventID);
      msg.SetMessageType(MESSAGE_TYPE_Local);
      msg.SetMessageSource(MODULE_DATABASE);

      MsgQueueMessage msgToSend;
      size_t size = msg.Export(msgToSend);

      if (FAIL == msgsnd(m_parentQueueHandle, &msgToSend, size, IPC_NOWAIT))
      {
         LOG_SYSTEM_ERROR("FAILED to send message on queue : %s\n", strerror(errno));
      }
   }
}

inline void SettingsChangeListener::RegisterParentData(SINT32 queueId, UINT32 eventID)
{
   m_parentQueueHandle = msgget(queueId, IPC_CREAT | MSGQ_CREAT_PERMISSION);
   m_eventID = eventID;
}

在我的另一个类中,我包含了SettingsChangeListener的一个成员,并在该类的构造函数中进行了初始化。

MyClass::MyClass()
    : m_connection(DB_CONNECTION_STRING.c_str()), m_settingsDbAdapter(m_connection), m_settingsChangeListener(m_connection)

我的天
连接和成员在应用程序的持续时间内保持不变。DB适配器是一个管理设置表(添加/更新/删除/获取)的类,并且可以正常工作。
我在operator()()中放置了一个断点,它永远不会被命中,我永远不会看到通知。我设置了一个断点并确认父数据的注册按预期发生。
这是我第一次使用pqxx,并不是所有的函数看起来都很容易实现。我在想,有一些简单的东西我错过了,但我就是找不到它。

nzrxty8p

nzrxty8p1#

我在示例代码中遗漏了一部分。这个功能应该存在于线程的上下文中,并且需要以下内容。这是从示例中逐字引用的。我将删除我不使用的部分。总的来说,pselect可能是让它工作的关键。

while (true)
   {
      /* Step #4.  Create set of file descriptors to pass to 'pselect()'.
       * You can put anything in here that you want (files, sockets, serial ports,
       * descriptors from other sources).
       *
       * IMPORTANT: If you want to catch 'NOTIFY' events, you MUST add the
       * 'db.sock()' descriptor!  This is the socket for the database connection,
       * where notifications from the database backend come in.
       */
      fd_set read_fds;
      FD_ZERO(&read_fds);
      FD_SET(STDIN_FILENO, &read_fds);  // optional (just part of the demo)
      FD_SET(db.sock(), &read_fds);    // postgres connection (mandatory)
      const int max_fd = db.sock() + 1;

      /* Optional timeout (in case your process wanted to take action if no events
       * occur for a period of time).
       */
      struct timespec ts;
      ts.tv_sec = 5;
      ts.tv_nsec = 0;

      /* Step #5.  Magic happens here!  Put process to sleep until something
       * "interesting" happens.
       *
       * Unix was built to do "synchronous IO multiplexing".
       *
       * pselect() can return the following information values:
       * -1 = pselect() failed or was interrupted (errno == EINTR).
       *  0 = timeout expired (timeout is optional).
       * >1 = count of file descriptors in any of the sets that have activity.
       *
       * If the return value is positive, pselect() will have modified the file
       * descriptor sets: descriptors with activity will remain set, but all
       * others will be cleared.
       *
       * pselect() can return "failure" (-1) for only one acceptable reason: it
       * was interrupted by a signal.  For this demo, we assume that that signal
       * is SIGCHLD.  Any other errno value is beyond the scope of the demo.
       */
      switch (pselect(max_fd, &read_fds, NULL, NULL, &ts, &origmask))
      {
         case -1:
            if (errno != EINTR) throw CStandardError("pselect");

            /* Interrupted by signal, which we assume is SIGCHLD.
             *
             * We reap children here, but without updating read_fds.  So don't
             * re-use it when we're done reaping.
             */
            reap_children(db);
            break;

         case 0:
            // No file descriptors ready; we got here through timeout.
            std::cout << "pselect() timeout" << std::endl;
            break;

         default:
            // Check for, and handle, incoming data on stdin.
            if (FD_ISSET(STDIN_FILENO, &read_fds))
               handle_stdin_data();

            // Check for, and handle, notifications.  Calls MyNoticer::operator().
            if (FD_ISSET(db.sock(), &read_fds))
               db.get_notifs();

            break;
      }
   }

相关问题