.net 将ManualResetEvent Package 为等待任务

pprl5pva  于 2023-05-30  发布在  .NET
关注(0)|答案(6)|浏览(157)

我想等待一个手动重置事件与超时和观察取消。我想到了下面的东西。手动重置事件对象是由一个API提供的,超出了我的控制范围。有没有一种方法可以在不占用和阻塞ThreadPool中的线程的情况下实现这一点?

static Task<bool> TaskFromWaitHandle(WaitHandle mre, int timeout, CancellationToken ct)
{
    return Task.Run(() =>
    {
        bool s = WaitHandle.WaitAny(new WaitHandle[] { mre, ct.WaitHandle }, timeout) == 0;
        ct.ThrowIfCancellationRequested();
        return s;
    }, ct);
}

// ...

if (await TaskFromWaitHandle(manualResetEvent, 1000, cts.Token))
{
    // true if event was set
}
else 
{
    // false if timed out, exception if cancelled 
}

**[编辑]**显然,它makes sense使用RegisterWaitForSingleObject。我会给予的。

toiithl6

toiithl61#

RegisterWaitForSingleObject将把等待合并到专用的waiter线程上,每个waiter线程可以等待多个句柄(具体来说,63个句柄,即MAXIMUM_WAIT_OBJECTS减去一个“控制”句柄)。
所以你应该能够使用这样的东西(警告:未测试):

public static class WaitHandleExtensions
{
    public static Task AsTask(this WaitHandle handle)
    {
        return AsTask(handle, Timeout.InfiniteTimeSpan);
    }

    public static Task AsTask(this WaitHandle handle, TimeSpan timeout)
    {
        var tcs = new TaskCompletionSource<object>();
        var registration = ThreadPool.RegisterWaitForSingleObject(handle, (state, timedOut) =>
        {
            var localTcs = (TaskCompletionSource<object>)state;
            if (timedOut)
                localTcs.TrySetCanceled();
            else
                localTcs.TrySetResult(null);
        }, tcs, timeout, executeOnlyOnce: true);
        tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
        return tcs.Task;
    }
}
hc8w905p

hc8w905p2#

您还可以使用SemaphoreSlim.WaitAsync(),它类似于ManualResetEvent

iswrvxsc

iswrvxsc3#

Stephen的Cleary解决方案看起来很完美。微软提供了类似的一个。
因为我还没有看到一个取消逻辑的例子。
这就是:

public static class WaitHandleExtensions
{
    public static Task WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken, int timeoutMilliseconds = Timeout.Infinite)
    {
        if (waitHandle == null)
            throw new ArgumentNullException(nameof(waitHandle));

        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        CancellationTokenRegistration ctr = cancellationToken.Register(() => tcs.TrySetCanceled());
        TimeSpan timeout = timeoutMilliseconds > Timeout.Infinite ? TimeSpan.FromMilliseconds(timeoutMilliseconds) : Timeout.InfiniteTimeSpan;

        RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
            (_, timedOut) =>
            {
                if (timedOut)
                {
                    tcs.TrySetCanceled();
                }
                else
                {
                    tcs.TrySetResult(true);
                }
            }, 
            null, timeout, true);

        Task<bool> task = tcs.Task;

        _ = task.ContinueWith(_ =>
        {
            rwh.Unregister(null);
            return ctr.Unregister();
        }, CancellationToken.None);

        return task;
    }
}
eufgjt7s

eufgjt7s4#

您可以给予一下https://www.badflyer.com/asyncmanualresetevent,它试图在https://blogs.msdn.com/b/pfxteam/archive/2012/02/11/10266920.aspx上构建示例,以支持超时和取消。

using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// An async manual reset event.
/// </summary>
public sealed class ManualResetEventAsync
{
    // Inspiration from https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/
    // and the .net implementation of SemaphoreSlim

    /// <summary>
    ///  The timeout in milliseconds to wait indefinitly.
    /// </summary>
    private const int WaitIndefinitly = -1;

    /// <summary>
    /// True to run synchronous continuations on the thread which invoked Set. False to run them in the threadpool.
    /// </summary>
    private readonly bool runSynchronousContinuationsOnSetThread = true;

    /// <summary>
    /// The current task completion source.
    /// </summary>
    private volatile TaskCompletionSource<bool> completionSource = new TaskCompletionSource<bool>();

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    public ManualResetEventAsync(bool isSet)
        : this(isSet: isSet, runSynchronousContinuationsOnSetThread: true)
    {
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="ManualResetEventAsync"/> class.
    /// </summary>
    /// <param name="isSet">True to set the task completion source on creation.</param>
    /// <param name="runSynchronousContinuationsOnSetThread">If you have synchronous continuations, they will run on the thread which invokes Set, unless you set this to false.</param>
    public ManualResetEventAsync(bool isSet, bool runSynchronousContinuationsOnSetThread)
    {
        this.runSynchronousContinuationsOnSetThread = runSynchronousContinuationsOnSetThread;

        if (isSet)
        {
            this.completionSource.TrySetResult(true);
        }
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <returns>A task which completes when the event is set.</returns>
    public Task WaitAsync()
    {
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, default(CancellationToken));
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event.</returns>
    public Task WaitAsync(CancellationToken token)
    {
        return this.AwaitCompletion(ManualResetEventAsync.WaitIndefinitly, token);
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <param name="token">A cancellation token.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout, CancellationToken token)
    {
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, token);
    }

    /// <summary>
    /// Wait for the manual reset event.
    /// </summary>
    /// <param name="timeout">A timeout.</param>
    /// <returns>A task which waits for the manual reset event. Returns true if the timeout has not expired. Returns false if the timeout expired.</returns>
    public Task<bool> WaitAsync(TimeSpan timeout)
    {
        return this.AwaitCompletion((int)timeout.TotalMilliseconds, default(CancellationToken));
    }

    /// <summary>
    /// Set the completion source.
    /// </summary>
    public void Set()
    {
        if (this.runSynchronousContinuationsOnSetThread)
        {
            this.completionSource.TrySetResult(true);
        }
        else
        {
            // Run synchronous completions in the thread pool.
            Task.Run(() => this.completionSource.TrySetResult(true));
        }
    }

    /// <summary>
    /// Reset the manual reset event.
    /// </summary>
    public void Reset()
    {
        // Grab a reference to the current completion source.
        var currentCompletionSource = this.completionSource;

        // Check if there is nothing to be done, return.
        if (!currentCompletionSource.Task.IsCompleted)
        {
            return;
        }

        // Otherwise, try to replace it with a new completion source (if it is the same as the reference we took before).
        Interlocked.CompareExchange(ref this.completionSource, new TaskCompletionSource<bool>(), currentCompletionSource);
    }

    /// <summary>
    /// Await completion based on a timeout and a cancellation token.
    /// </summary>
    /// <param name="timeoutMS">The timeout in milliseconds.</param>
    /// <param name="token">The cancellation token.</param>
    /// <returns>A task (true if wait succeeded). (False on timeout).</returns>
    private async Task<bool> AwaitCompletion(int timeoutMS, CancellationToken token)
    {
        // Validate arguments.
        if (timeoutMS < -1 || timeoutMS > int.MaxValue)
        {
            throw new ArgumentException("The timeout must be either -1ms (indefinitely) or a positive ms value <= int.MaxValue");
        }

        CancellationTokenSource timeoutToken = null;

        // If the token cannot be cancelled, then we dont need to create any sort of linked token source.
        if (false == token.CanBeCanceled)
        {
            // If the wait is indefinite, then we don't need to create a second task at all to wait on, just wait for set. 
            if (timeoutMS == -1)
            {
                return await this.completionSource.Task;
            }

            timeoutToken = new CancellationTokenSource();
        }
        else
        {
            // A token source which will get canceled either when we cancel it, or when the linked token source is canceled.
            timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(token);
        }

        using (timeoutToken)
        {
            // Create a task to account for our timeout. The continuation just eats the task cancelled exception, but makes sure to observe it.
            Task delayTask = Task.Delay(timeoutMS, timeoutToken.Token).ContinueWith((result) => { var e = result.Exception; }, TaskContinuationOptions.ExecuteSynchronously);

            var resultingTask = await Task.WhenAny(this.completionSource.Task, delayTask).ConfigureAwait(false);

            // The actual task finished, not the timeout, so we can cancel our cancellation token and return true.
            if (resultingTask != delayTask)
            {
                // Cancel the timeout token to cancel the delay if it is still going.
                timeoutToken.Cancel();
                return true;
            }

            // Otherwise, the delay task finished. So throw if it finished because it was canceled.
            token.ThrowIfCancellationRequested();
            return false;
        }
    }
}
2cmtqfgy

2cmtqfgy5#

替代解决方案:等待任务句柄和手动重置事件

我在使用Task.WaitAny()Task(由SqlConnection.OpenAsync()')返回)以及作为参数接收并 Package 在TaskAsTask()中的手动重置事件时发生了内存泄漏。这些对象未被释放:TaskCompletionSource<Object>, Task<Object>, StandardTaskContinuation, RegisteredWaitHandle, RegisteredWaithandleSafe, ContinuationResultTaskFromresultTask<Object,bool>, _ThreadPoolWaitOrTimerCallback)。
这是在Windows服务中使用的一个函数的真实的生产代码,该函数试图在循环中打开到db的连接,直到连接打开,或操作失败,或在包含此代码的函数中作为参数接收的ManualResetEvent _finishRequest被任何其他线程中的代码发出信号。
为了避免泄密,我决定反过来做:等待OpenAsync()返回的_finishRequestTask的句柄:

Task asyncOpening = sqlConnection.OpenAsync();

// Wait for the async open to finish, or until _finishRequest is signaled
var waitHandles = new WaitHandle[]
{
  // index 0 in array: extract the AsyncWaitHandle from the Task
  ((IAsyncResult)asyncOpening).AsyncWaitHandle,
  // index 1:
  _finishRequest
};

// Check if finish was requested (index of signaled handle in the array = 1)
int whichFinished = WaitHandle.WaitAny(waitHandles);
finishRequested = whichFinished == 1;
// If so, break the loop to exit immediately
if (finishRequested)
  break;
                    
// If not, check if OpenAsync finished with error (it's a Task)
if (asyncOpening.IsFaulted)
{
  // Extract the exception from the task, and throw it
  // NOTE: adapt it to your case. In mine, I'm interested in the inner exception,
  // but you can check the exception itself, for example to see if it was a timeout,
  // if you specified it in the call to the async function that returns the Task
  var ex = asyncOpening?.Exception?.InnerExceptions?[0];
  if (ex != null) throw ex; 
}
else
{
  Log.Verbose("Connection to database {Database} on server {Server}", database, server);
  break;
}

如果你还需要超时,你可以将它包含在对OpenAsync的调用中,或者你的asyn函数中,然后检查异步操作的结果是否因为超时而被取消:完成后检查任务的状态,如代码注解中的NOTE所示。

ttp71kqs

ttp71kqs6#

安装Microsoft.VisualStudio.Threading包,然后您将能够使用AsyncManualResetEvent类,其自己的文档指出:
一种可以异步等待的ManualResetEvent。
文档:https://learn.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncmanualresetevent

相关问题