c++ ReadDirectoryChangesW停止处理大量文件

ia2d9nvy  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(93)

我使用ReadDirectoryChangesWIOCompletionQueue一起接收通知,如果新文件被添加到特定文件夹。
我目前面临的问题是,如果一个巨大的批量复制操作发生(约800个文件),我的程序停止工作。
我调试了程序,发现它在调用GetQueuedCompletionStatus时总是失败。函数调用返回TRUE,但传入的变量lpNumberOfBytesTransfered被设置为0
下面是我使用的代码(我也欢迎任何建议和帮助,因为我不是Windows API和ReadDirectoryChangesW方面的Maven)

//FileIOAdapterImpl
HandlePtr FileIoAdapterImpl::CreateIoCompletionPortWrapper() {
  HandlePtr io_handle{ CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0) };
  return io_handle;
}

bool FileIoAdapterImpl::AssociateWithFileHandle(HANDLE directory, HANDLE existing_completion_port) {
  return CreateIoCompletionPort(directory, existing_completion_port, (ULONG_PTR)directory, 1);
}

bool FileIoAdapterImpl::GetQueuedCompletionStatusWrapper(HANDLE completion_token,
  LPDWORD bytes_transferred, PULONG_PTR completion_key, LPOVERLAPPED* overlap) {
  return GetQueuedCompletionStatus(completion_token, bytes_transferred, completion_key, overlap, 16);

}

HandlePtr FileIoAdapterImpl::CreateFileWrapper(const std::string& path) {
  HandlePtr dir_handle{
    CreateFileA(path.c_str(), FILE_LIST_DIRECTORY,
                FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL,
                OPEN_EXISTING,
                FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL)
  };

  return dir_handle;
}
bool FileIoAdapterImpl::ReadDirectoryChangesWrapper(HANDLE directory, std::vector<std::byte>& buffer, LPDWORD bytes_returned, OVERLAPPED* overlapped) {
  return ReadDirectoryChangesW(directory, buffer.data(), buffer.size(), FALSE, FILE_NOTIFY_CHANGE_FILE_NAME, bytes_returned, overlapped, nullptr);
}

字符串
这个类只是我使用的WindowsAPI函数的 Package 器(用于测试/模拟目的)。HandlePtr只是一个unique_ptr,带有一个自定义删除器来调用CloseHandle函数。

//ModernDirectoryWatcher.h
class ModernDirectoryWatcher {
public:
  using Callback = std::function<void(const std::string& filename)>;

  ModernDirectoryWatcher(const std::filesystem::path& input_directory, std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper = std::make_shared<FileIoAdapterImpl>());

  explicit operator bool() const {
    return is_valid_;
  }

  bool watch();

  void stop();

private:

  bool event_recv();
  bool event_send();
  void handle_events();
  bool has_event() const {
    return event_buf_len_ready_ != 0;
  }
  bool is_processable_file(const std::filesystem::path& path) const;

  Poco::Logger& logger_{ Poco::Logger::get("ModernDirectoryWatcher") };
  std::filesystem::path input_directory_;
  std::vector<std::string> file_types_{};
  Callback callback_;
  FileIOAdapterPtr win_io_api_wrapper_;
  HandlePtr path_handle_;
  HandlePtr event_completion_token_;
  unsigned long event_buf_len_ready_{ 0 };
  bool is_valid_{ false };
  OVERLAPPED event_overlap_{};
  std::vector<std::byte> event_buf_{ 64 * 1024 };
};
//ModernDirectoryWatcher.cpp
ModernDirectoryWatcher::ModernDirectoryWatcher(const std::filesystem::path& input_directory,
  std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper)
  : input_directory_(input_directory),
  file_types_(std::move(file_types)),
  callback_(std::move(callback)),
  win_io_api_wrapper_(std::move(io_wrapper)) {

  path_handle_ = win_io_api_wrapper_->CreateFileWrapper(input_directory_.string());

  if (path_handle_.get() != INVALID_HANDLE_VALUE) {
    poco_information(logger_, "Create Completition Token");
    event_completion_token_ = win_io_api_wrapper_->CreateIoCompletionPortWrapper();
  }

  if (event_completion_token_.get() != nullptr) {
    poco_information(logger_, "Associate with the FileHandle");
    is_valid_ = win_io_api_wrapper_->AssociateWithFileHandle(path_handle_.get(), event_completion_token_.get());
  }
}

bool ModernDirectoryWatcher::watch() {
  poco_information(logger_, "Start Watching...");
  if (is_valid_) {
    poco_information(logger_, "Receive Events");
    event_recv();

    while (is_valid_ && has_event()) {
      poco_information(logger_, "There are still events to process...");
      event_send();
    }

    while (is_valid_) {
      ULONG_PTR completion_key{ 0 };
      LPOVERLAPPED overlap{ 0 };
      bool complete = win_io_api_wrapper_->GetQueuedCompletionStatusWrapper(event_completion_token_.get(), &event_buf_len_ready_, &completion_key, &overlap);
      if (complete && event_buf_len_ready_ == 0) {
        poco_error(logger_, "Error"); // HERE THE ERROR IS PRINTED 
      }
      if (complete && overlap) {
        poco_information(logger_, "Handle the events");
        handle_events();
      } else if (int err_code = GetLastError() != 258 && !complete) {
        poco_error(logger_, "Error");
      }
    }
    return true;
  } else {
    return false;
  }
}

void ModernDirectoryWatcher::handle_events() {
  while (is_valid_ && has_event()) {
    poco_information(logger_, "Send Event");
    event_send();
    poco_information(logger_, "Receive Events");
    event_recv();
  }
}

void ModernDirectoryWatcher::stop() {
  poco_notice(logger_, "Stop the Watcher");
  is_valid_ = false;
}

bool ModernDirectoryWatcher::event_recv() {
  event_buf_len_ready_ = 0;
  DWORD bytes_returned = 0;
  memset(&event_overlap_, 0, sizeof(OVERLAPPED));
  poco_information(logger_, "Call the ReadDirectoryChanges");
  auto read_ok = win_io_api_wrapper_->ReadDirectoryChangesWrapper(path_handle_.get(), event_buf_, &bytes_returned, &event_overlap_);

 
  if (!event_buf_.empty() && read_ok) {
    event_buf_len_ready_ = bytes_returned > 0 ? bytes_returned : 0;
    poco_information_f1(logger_, "Event Buffer Len: %?d", event_buf_len_ready_);
    return true;
  }

  if (GetLastError() == ERROR_IO_PENDING) {
    poco_error(logger_, "Error Pending IO Received stopping...");
    event_buf_len_ready_ = 0;
    is_valid_ = false;
  } else {
    poco_error_f1(logger_, "Error Code: %?d", GetLastError());
  }
  return false;
}

bool ModernDirectoryWatcher::event_send() {
  auto buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(event_buf_.data());
  if (is_valid_) {
    while (buf + sizeof(FILE_NOTIFY_INFORMATION) <= buf + event_buf_len_ready_) {
      poco_information(logger_, "Get valid Buffer...");
      auto filename = input_directory_ / std::wstring{ buf->FileName, buf->FileNameLength / 2 };
      if ((buf->Action == FILE_ACTION_ADDED || buf->Action == FILE_ACTION_RENAMED_NEW_NAME) && is_processable_file(filename)) {
        callback_(filename.string());
      }

      if (buf->NextEntryOffset == 0) {
        break;
      }

      buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(reinterpret_cast<std::byte*>(buf) + buf->NextEntryOffset);

    }
    return true;
  } else {
    return false;
  }
}

bool ModernDirectoryWatcher::is_processable_file(const std::filesystem::path& path) const {
  std::string extension = path.extension().string().erase(0, 1);
  extension = Poco::toLower(extension);
  return std::find(file_types_.begin(), file_types_.end(), extension) != file_types_.end();
}

的数据
我从我的应用程序中以线程的形式启动Watcher,如下所示:

ModernDirectoryWatcher directory_watcher{dir, ftypes, [this](const std::string& filename) {...})}
std::thread watcher_thread = std::thread{&ModernDirectoryWatcher::watch, &directory_watcher}


如果我复制了大量的文件(~800)文件到监视目录的代码打印出错误,然后如果我再次添加文件(仅举一例)文件不再被识别。我知道发生这种情况是因为我检查has_event函数,如果event_buf_len_ready_大于0,则通常应该是什么情况,如果GetQueuedCompletionStatus是返回TRUE。但如果我删除检查,我开始得到垃圾的缓冲区。
编辑:
我在调试过程中看到的一件事。如果我在if (complete && event_buf_len_ready_ == 0)内部设置一个断点,然后查看OVERLAPPED结构,我看到internal属性的值为268。但我快速执行了net helpmsg 268,但它没有找到该数字的错误消息。

xurqigkl

xurqigkl1#

这是有记录的行为。

When you first call ReadDirectoryChangesW, the system allocates a buffer to
store change information. This buffer is associated with the directory handle until
it is closed and its size does not change during its lifetime. Directory changes
that occur between calls to this function are added to the buffer and then
returned with the next call.

If the buffer overflows, ReadDirectoryChangesW will still return true, but the
entire contents of the buffer are discarded and the lpBytesReturned parameter
will be zero, which indicates that your buffer was too small to hold all of the
changes that occurred.

字符串
在这种情况下,您必须假设更改非常大,因此从头开始重新读取目录是更好的方法。

相关问题