.net 使用异步等待从Redis读取多个批量数据并将其并行写入SQL Server的代码的性能问题

yqkkidmi  于 2023-01-18  发布在  .NET
关注(0)|答案(1)|浏览(112)

我用C#编写了从Redis读取大量数据并使用异步等待将这些数据写入SQL Server的代码。
我在我的TeamController类中使用async/await编写了以下代码,以便在向SQL Server写入数据的同时读取REDIS:

[Route("api/[controller]")]
[ApiController]
public class TeamController : ControllerBase
{
    private ICacheManager cacheManager;
    private IDBManager dbManager;
    private IDomainDataConverter _domainDataConverter;

    public TeamController(ICacheManager cacheManager,
                          IDBManager dbManager,
                          IDomainDataConverter domainDataConverter)
    {            
        this.cacheManager = cacheManager;
        this.dbManager = dbManager;
        this._domainDataConverter = domainDataConverter;
    } 
        
    [HttpPost, Route("SaveDataParallel")]
    public async Task<IActionResult> SaveDataParallel(int parallelDegree, int totalCount)
    {  
        int chunkeSize = totalCount / parallelDegree;
        int remainder = totalCount - chunkeSize * parallelDegree;

        System.Diagnostics.Stopwatch st = new System.Diagnostics.Stopwatch();
        st.Start();

        try
        {
            var tasks = new List<Task>();

            for (int i = 0; i < parallelDegree; i++)
            {
                tasks.Add(SaveChunkAsync(i, chunkeSize, parallelDegree, remainder));
            }

            await Task.WhenAll(tasks);

            st.Stop();
        }
        catch 
        {
        }

        return Ok(st.ElapsedMilliseconds);
    }

    [HttpPost, Route("SaveDataSimple")]
    public IActionResult SaveDataWithSimple(int parallelDegree,  int totalCount)
    {
        int chunkeSize = totalCount / parallelDegree;
        int remainder = totalCount - chunkeSize * parallelDegree;

        System.Diagnostics.Stopwatch st = new System.Diagnostics.Stopwatch();
        st.Start();

        try
        {
            for (int i = 0; i < parallelDegree; i++)
            {
                SaveChunk(i, chunkeSize, parallelDegree, remainder);                   
            }

            st.Stop();
        }
        catch (Exception ex)
        {
        }

        return Ok(st.ElapsedMilliseconds);
    }

    private async Task SaveChunkAsync(int i, int pageSize, int parallelDegree, int remainder)
    {
        var data = cacheManager.ReadDataAsync<TeamDto>(i * pageSize, (i == parallelDegree - 1 ? remainder : 0) + pageSize);
        var arr = _domainDataConverter.Convert<Team, TeamDto>(data.Result);
        await dbManager.BulkInsertAsync(arr);
    }

    private void SaveChunk(int i, int pageSize, int parallelDegree, int remainder)
    {
        var data = cacheManager.ReadData<TeamDto>(i * pageSize, (i == parallelDegree - 1 ? remainder : 0) + pageSize);
        var arr = _domainDataConverter.Convert<Team, TeamDto>(data);
        dbManager.BulkInsert(arr);
    }
}

我比较了两种方法的性能:SaveDataParallelSaveDataWithSimple。不幸的是,我似乎看不出这两种方法调用在性能方面有什么显著的区别。
如果

n  = total number of read and writes
td = time required to save chunk of data to SQL,
tr = time required to read chunk of data from Redis,
tparallel = total time for SaveDataParallel,
tsimple = total time for SaveDataWithSimple,

我期望tparallel如下所示:

tparallel = (max(td, tr) * n) / 2 + tr

tsimple,如下所示:

tsimple = (max(td, tr)) * n

但是结果是不同的,并且两个tsimpletparallel值看起来没有显著差异。
有人知道为什么吗?我期待的是正确的事情吗?还是代码有问题?
如有任何意见或指导,我将不胜感激。

xdnvmnnf

xdnvmnnf1#

async方法的第一部分是同步执行的。如果被调用的方法返回了一个完成的任务,await有时可以同步完成(参见 *"热路径"优化 * https://devblogs.microsoft.com/premier-developer/dissecting-the-async-methods-in-c/部分)。
例如,cacheManager.ReadDataAsync可能快速完成任务并返回已完成的任务,而不是真正异步运行。那么_domainDataConverter.Convert也将同步运行。for (int i = 0; i < parallelDegree; i++)循环将占用ReadDataAsync + Convert的所有时间。是的,BulkInsertAsync将并行运行,但是如果让Convert也并行运行,您可以做得更好。
比如:

private async Task SaveChunkAsync(int i, int pageSize,
    int parallelDegree, int remainder)
{
    var data = await cacheManager.ReadDataAsync<TeamDto>(i * pageSize,
        (i == parallelDegree - 1 ? remainder : 0) + pageSize);
    var arr = await _domainDataConverter.ConvertAsync<Team, TeamDto>(data);
    await dbManager.BulkInsertAsync(arr);
}

另外,您还可以根据并行线程的数量将所有数据一次划分为多个部分,我建议将其划分为固定(可能可配置)长度的较小部分,然后使用Parallel.ForEach处理它们,在这种情况下,我将使用SaveCunkParallel.ForEachAsyncSaveChunkAsync,尽管我还没有尝试过这种方法。
选择什么取决于你是否有一些CPU的工作,并希望充分利用它,或者如果你主要是等待的东西。在前一种情况下,多线程方法可能会给你所需的控制加载所有的CPU核心。在后一种情况下,异步方法可能会更好。
然后,您可以使用不同的批处理大小和并行工作线程数来找到最佳组合。当我执行并行批量插入时,我使用Parallel.ForEach并使用4个线程。增加此限制并没有带来任何明显的好处。

测试不同的实现

您可以运行以下程序来查看SaveChunk方法的可能时间线,具体取决于实现细节。我准备了ReadData、Convert和BulkInsert的模拟,以便它们分别花费40ms、200ms和40ms来模拟tr=24, td=4情况。还有parallelDegree=10。第一个变体是同步的,大约花费2800ms。第二个是部分异步的,并且花费大约2440ms,而第三个是异步的,并且花费大约560ms

using System.Diagnostics;

namespace SaveChunks
{
    internal class Program
    {
        static async Task Main(string[] args)
        {
            int parallelDegree = 10;

            Console.WriteLine("Simple:");
            var target = new DataSaver();
            target.SaveDataSimple(parallelDegree);

            Console.WriteLine("Partially Async:");
            await target.SaveDataPartiallyAsync(parallelDegree);

            Console.WriteLine("Async:");
            await target.SaveDataAsync(parallelDegree);
        }
    }

    internal class DataSaver
    {
        private Stopwatch _timer = new Stopwatch();

        public void SaveDataSimple(int parallelDegree)
        {
            _timer.Restart();

            for (int i = 0; i < parallelDegree; i++)
                SaveChunk(i);

            _timer.Stop();
        }

        public async Task SaveDataPartiallyAsync(int parallelDegree)
        {
            _timer.Restart();

            var tasks = new List<Task>();

            for (int i = 0; i < parallelDegree; i++)
                tasks.Add(SaveChunkPartiallyAsync(i));

            await Task.WhenAll(tasks);

            _timer.Stop();
        }

        public async Task SaveDataAsync(int parallelDegree)
        {
            _timer.Restart();

            var tasks = new List<Task>();

            for (int i = 0; i < parallelDegree; i++)
                tasks.Add(SaveChunkAsync(i));

            await Task.WhenAll(tasks);

            _timer.Stop();
        }

        private void SaveChunk(int i)
        {
            var data = ReadData(i);
            var converted = Convert(i, data);
            BulkInsert(i, converted);
        }

        private async Task SaveChunkPartiallyAsync(int i)
        {
            var data = await ReadDataPartiallyAsync(i);
            var converted = Convert(i, data);
            await BulkInsertAsync(i, converted);
        }

        private async Task SaveChunkAsync(int i)
        {
            var data = await ReadDataAsync(i);
            var converted = await ConvertAsync(i, data);
            await BulkInsertAsync(i, converted);
        }

        // Synchronous implementation

        private int ReadData(int i)
        {
            Log(i, "ReadData start");
            Pause(40);
            Log(i, "ReadData end");
            return 0;
        }

        private int Convert(int i, int data)
        {
            Log(i, "Convert start");
            Pause(200);
            Log(i, "Convert end");
            return 0;
        }

        private void BulkInsert(int i, int data)
        {
            Log(i, "BulkInsert start");
            Pause(40);
            Log(i, "BulkInsert end");
        }

        // Partially-asynchronous implementation

        private Task<int> ReadDataPartiallyAsync(int i)
        {
            return Task.FromResult(ReadData(i));
        }

        private Task<int> ConvertAsync(int i, int data)
        {
            return Task.Run(() => Convert(i, data));
        }

        private Task BulkInsertAsync(int i, int data)
        {
            return Task.Run(() => BulkInsert(i, data));
        }

        // Asyncronous implementation

        private Task<int> ReadDataAsync(int i)
        {
            return Task.Run(() => ReadData(i));
        }

        // Helper methods

        private void Pause(int ms)
        {
            var start = _timer.ElapsedMilliseconds;
            var sum = 0;
            while (_timer.ElapsedMilliseconds - start < ms)
                sum += sum * sum;
        }

        private void Log(int i, string message)
        {
            char c = message[0];
            Console.WriteLine($"{_timer.ElapsedMilliseconds,5} {new string(' ', i * 2)}{c}");
        }

        private void Log2(int i, string message)
        {
            Console.WriteLine($"{_timer.ElapsedMilliseconds}\t{i}\t{message}");
        }
    }
}

日志显示了所有3个变体的时间线。正如你所看到的,在第二个变体中,我们没有并行运行超过2个任务。但在第三个变体中,我们并行得多。

Simple:
    1 R
   43 R
   43 C
  243 C
  243 B
  283 B
  283   R
  323   R
  323   C
  523   C
  523   B
  563   B
  563     R
  603     R
  603     C
  803     C
  803     B
  843     B
  843       R
  883       R
  883       C
 1083       C
 1083       B
 1123       B
 1123         R
 1163         R
 1163         C
 1363         C
 1363         B
 1403         B
 1403           R
 1443           R
 1443           C
 1643           C
 1643           B
 1683           B
 1683             R
 1723             R
 1723             C
 1923             C
 1923             B
 1963             B
 1963               R
 2003               R
 2003               C
 2203               C
 2203               B
 2243               B
 2243                 R
 2283                 R
 2283                 C
 2483                 C
 2483                 B
 2523                 B
 2523                   R
 2563                   R
 2563                   C
 2763                   C
 2763                   B
 2803                   B
Partially Async:
    0 R
   40 R
   40 C
  240 C
  244   R
  244 B
  284 B
  284   R
  284   C
  484   C
  484     R
  484   B
  524   B
  524     R
  524     C
  724     C
  724       R
  724     B
  764       R
  764       C
  764     B
  964       C
  964         R
  964       B
 1004         R
 1004         C
 1004       B
 1204         C
 1204           R
 1204         B
 1244           R
 1244           C
 1244         B
 1444           C
 1444             R
 1444           B
 1484             R
 1484             C
 1484           B
 1684             C
 1684             B
 1684               R
 1724               R
 1724               C
 1724             B
 1924               C
 1924                 R
 1924               B
 1964                 R
 1964                 C
 1964               B
 2164                 C
 2164                   R
 2164                 B
 2204                   R
 2204                   C
 2204                 B
 2404                   C
 2404                   B
 2444                   B
Async:
    1 R
    2   R
    2       R
    2         R
    2           R
    3     R
   27             R
   34                   R
   41 R
   41 C
   42       R
   42         R
   42       C
   42         C
   43     R
   43     C
   43           R
   43           C
   44   R
   45   C
   72             R
   72             C
   79                   R
   82                   C
  241 C
  241 B
  242         C
  242       C
  242         B
  242       B
  243     C
  243     B
  245   C
  245   B
  251           C
  251           B
  272             C
  272             B
  281 B
  281               R
  282                   C
  282       B
  282                   B
  282                 R
  283     B
  283         B
  285   B
  294           B
  312             B
  321               R
  321               C
  322                   B
  322                 R
  322                 C
  521               C
  521               B
  522                 C
  522                 B
  561               B
  562                 B

相关问题