我使用ReadDirectoryChangesW
和IOCompletionQueue
一起接收通知,如果新文件被添加到特定文件夹。
我目前面临的问题是,如果一个巨大的批量复制操作发生(约800个文件),我的程序停止工作。
我调试了程序,发现它在调用GetQueuedCompletionStatus
时总是失败。函数调用返回TRUE
,但传入的变量lpNumberOfBytesTransfered
被设置为0
。
下面是我使用的代码(我也欢迎任何建议和帮助,因为我不是Windows API和ReadDirectoryChangesW方面的Maven)
//FileIOAdapterImpl
HandlePtr FileIoAdapterImpl::CreateIoCompletionPortWrapper() {
HandlePtr io_handle{ CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0) };
return io_handle;
}
bool FileIoAdapterImpl::AssociateWithFileHandle(HANDLE directory, HANDLE existing_completion_port) {
return CreateIoCompletionPort(directory, existing_completion_port, (ULONG_PTR)directory, 1);
}
bool FileIoAdapterImpl::GetQueuedCompletionStatusWrapper(HANDLE completion_token,
LPDWORD bytes_transferred, PULONG_PTR completion_key, LPOVERLAPPED* overlap) {
return GetQueuedCompletionStatus(completion_token, bytes_transferred, completion_key, overlap, 16);
}
HandlePtr FileIoAdapterImpl::CreateFileWrapper(const std::string& path) {
HandlePtr dir_handle{
CreateFileA(path.c_str(), FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL)
};
return dir_handle;
}
bool FileIoAdapterImpl::ReadDirectoryChangesWrapper(HANDLE directory, std::vector<std::byte>& buffer, LPDWORD bytes_returned, OVERLAPPED* overlapped) {
return ReadDirectoryChangesW(directory, buffer.data(), buffer.size(), FALSE, FILE_NOTIFY_CHANGE_FILE_NAME, bytes_returned, overlapped, nullptr);
}
字符串
这个类只是我使用的WindowsAPI函数的 Package 器(用于测试/模拟目的)。HandlePtr
只是一个unique_ptr
,带有一个自定义删除器来调用CloseHandle
函数。
//ModernDirectoryWatcher.h
class ModernDirectoryWatcher {
public:
using Callback = std::function<void(const std::string& filename)>;
ModernDirectoryWatcher(const std::filesystem::path& input_directory, std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper = std::make_shared<FileIoAdapterImpl>());
explicit operator bool() const {
return is_valid_;
}
bool watch();
void stop();
private:
bool event_recv();
bool event_send();
void handle_events();
bool has_event() const {
return event_buf_len_ready_ != 0;
}
bool is_processable_file(const std::filesystem::path& path) const;
Poco::Logger& logger_{ Poco::Logger::get("ModernDirectoryWatcher") };
std::filesystem::path input_directory_;
std::vector<std::string> file_types_{};
Callback callback_;
FileIOAdapterPtr win_io_api_wrapper_;
HandlePtr path_handle_;
HandlePtr event_completion_token_;
unsigned long event_buf_len_ready_{ 0 };
bool is_valid_{ false };
OVERLAPPED event_overlap_{};
std::vector<std::byte> event_buf_{ 64 * 1024 };
};
//ModernDirectoryWatcher.cpp
ModernDirectoryWatcher::ModernDirectoryWatcher(const std::filesystem::path& input_directory,
std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper)
: input_directory_(input_directory),
file_types_(std::move(file_types)),
callback_(std::move(callback)),
win_io_api_wrapper_(std::move(io_wrapper)) {
path_handle_ = win_io_api_wrapper_->CreateFileWrapper(input_directory_.string());
if (path_handle_.get() != INVALID_HANDLE_VALUE) {
poco_information(logger_, "Create Completition Token");
event_completion_token_ = win_io_api_wrapper_->CreateIoCompletionPortWrapper();
}
if (event_completion_token_.get() != nullptr) {
poco_information(logger_, "Associate with the FileHandle");
is_valid_ = win_io_api_wrapper_->AssociateWithFileHandle(path_handle_.get(), event_completion_token_.get());
}
}
bool ModernDirectoryWatcher::watch() {
poco_information(logger_, "Start Watching...");
if (is_valid_) {
poco_information(logger_, "Receive Events");
event_recv();
while (is_valid_ && has_event()) {
poco_information(logger_, "There are still events to process...");
event_send();
}
while (is_valid_) {
ULONG_PTR completion_key{ 0 };
LPOVERLAPPED overlap{ 0 };
bool complete = win_io_api_wrapper_->GetQueuedCompletionStatusWrapper(event_completion_token_.get(), &event_buf_len_ready_, &completion_key, &overlap);
if (complete && event_buf_len_ready_ == 0) {
poco_error(logger_, "Error"); // HERE THE ERROR IS PRINTED
}
if (complete && overlap) {
poco_information(logger_, "Handle the events");
handle_events();
} else if (int err_code = GetLastError() != 258 && !complete) {
poco_error(logger_, "Error");
}
}
return true;
} else {
return false;
}
}
void ModernDirectoryWatcher::handle_events() {
while (is_valid_ && has_event()) {
poco_information(logger_, "Send Event");
event_send();
poco_information(logger_, "Receive Events");
event_recv();
}
}
void ModernDirectoryWatcher::stop() {
poco_notice(logger_, "Stop the Watcher");
is_valid_ = false;
}
bool ModernDirectoryWatcher::event_recv() {
event_buf_len_ready_ = 0;
DWORD bytes_returned = 0;
memset(&event_overlap_, 0, sizeof(OVERLAPPED));
poco_information(logger_, "Call the ReadDirectoryChanges");
auto read_ok = win_io_api_wrapper_->ReadDirectoryChangesWrapper(path_handle_.get(), event_buf_, &bytes_returned, &event_overlap_);
if (!event_buf_.empty() && read_ok) {
event_buf_len_ready_ = bytes_returned > 0 ? bytes_returned : 0;
poco_information_f1(logger_, "Event Buffer Len: %?d", event_buf_len_ready_);
return true;
}
if (GetLastError() == ERROR_IO_PENDING) {
poco_error(logger_, "Error Pending IO Received stopping...");
event_buf_len_ready_ = 0;
is_valid_ = false;
} else {
poco_error_f1(logger_, "Error Code: %?d", GetLastError());
}
return false;
}
bool ModernDirectoryWatcher::event_send() {
auto buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(event_buf_.data());
if (is_valid_) {
while (buf + sizeof(FILE_NOTIFY_INFORMATION) <= buf + event_buf_len_ready_) {
poco_information(logger_, "Get valid Buffer...");
auto filename = input_directory_ / std::wstring{ buf->FileName, buf->FileNameLength / 2 };
if ((buf->Action == FILE_ACTION_ADDED || buf->Action == FILE_ACTION_RENAMED_NEW_NAME) && is_processable_file(filename)) {
callback_(filename.string());
}
if (buf->NextEntryOffset == 0) {
break;
}
buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(reinterpret_cast<std::byte*>(buf) + buf->NextEntryOffset);
}
return true;
} else {
return false;
}
}
bool ModernDirectoryWatcher::is_processable_file(const std::filesystem::path& path) const {
std::string extension = path.extension().string().erase(0, 1);
extension = Poco::toLower(extension);
return std::find(file_types_.begin(), file_types_.end(), extension) != file_types_.end();
}
的数据
我从我的应用程序中以线程的形式启动Watcher,如下所示:
ModernDirectoryWatcher directory_watcher{dir, ftypes, [this](const std::string& filename) {...})}
std::thread watcher_thread = std::thread{&ModernDirectoryWatcher::watch, &directory_watcher}
型
如果我复制了大量的文件(~800)文件到监视目录的代码打印出错误,然后如果我再次添加文件(仅举一例)文件不再被识别。我知道发生这种情况是因为我检查has_event
函数,如果event_buf_len_ready_
大于0
,则通常应该是什么情况,如果GetQueuedCompletionStatus
是返回TRUE
。但如果我删除检查,我开始得到垃圾的缓冲区。
编辑:
我在调试过程中看到的一件事。如果我在if (complete && event_buf_len_ready_ == 0)
内部设置一个断点,然后查看OVERLAPPED
结构,我看到internal
属性的值为268。但我快速执行了net helpmsg 268
,但它没有找到该数字的错误消息。
1条答案
按热度按时间xurqigkl1#
这是有记录的行为。
字符串
在这种情况下,您必须假设更改非常大,因此从头开始重新读取目录是更好的方法。