我用C#编写了从Redis读取大量数据并使用异步等待将这些数据写入SQL Server的代码。
我在我的TeamController
类中使用async/await编写了以下代码,以便在向SQL Server写入数据的同时读取REDIS:
[Route("api/[controller]")]
[ApiController]
public class TeamController : ControllerBase
{
private ICacheManager cacheManager;
private IDBManager dbManager;
private IDomainDataConverter _domainDataConverter;
public TeamController(ICacheManager cacheManager,
IDBManager dbManager,
IDomainDataConverter domainDataConverter)
{
this.cacheManager = cacheManager;
this.dbManager = dbManager;
this._domainDataConverter = domainDataConverter;
}
[HttpPost, Route("SaveDataParallel")]
public async Task<IActionResult> SaveDataParallel(int parallelDegree, int totalCount)
{
int chunkeSize = totalCount / parallelDegree;
int remainder = totalCount - chunkeSize * parallelDegree;
System.Diagnostics.Stopwatch st = new System.Diagnostics.Stopwatch();
st.Start();
try
{
var tasks = new List<Task>();
for (int i = 0; i < parallelDegree; i++)
{
tasks.Add(SaveChunkAsync(i, chunkeSize, parallelDegree, remainder));
}
await Task.WhenAll(tasks);
st.Stop();
}
catch
{
}
return Ok(st.ElapsedMilliseconds);
}
[HttpPost, Route("SaveDataSimple")]
public IActionResult SaveDataWithSimple(int parallelDegree, int totalCount)
{
int chunkeSize = totalCount / parallelDegree;
int remainder = totalCount - chunkeSize * parallelDegree;
System.Diagnostics.Stopwatch st = new System.Diagnostics.Stopwatch();
st.Start();
try
{
for (int i = 0; i < parallelDegree; i++)
{
SaveChunk(i, chunkeSize, parallelDegree, remainder);
}
st.Stop();
}
catch (Exception ex)
{
}
return Ok(st.ElapsedMilliseconds);
}
private async Task SaveChunkAsync(int i, int pageSize, int parallelDegree, int remainder)
{
var data = cacheManager.ReadDataAsync<TeamDto>(i * pageSize, (i == parallelDegree - 1 ? remainder : 0) + pageSize);
var arr = _domainDataConverter.Convert<Team, TeamDto>(data.Result);
await dbManager.BulkInsertAsync(arr);
}
private void SaveChunk(int i, int pageSize, int parallelDegree, int remainder)
{
var data = cacheManager.ReadData<TeamDto>(i * pageSize, (i == parallelDegree - 1 ? remainder : 0) + pageSize);
var arr = _domainDataConverter.Convert<Team, TeamDto>(data);
dbManager.BulkInsert(arr);
}
}
我比较了两种方法的性能:SaveDataParallel
和SaveDataWithSimple
。不幸的是,我似乎看不出这两种方法调用在性能方面有什么显著的区别。
如果
n = total number of read and writes
td = time required to save chunk of data to SQL,
tr = time required to read chunk of data from Redis,
tparallel = total time for SaveDataParallel,
tsimple = total time for SaveDataWithSimple,
我期望tparallel
如下所示:
tparallel = (max(td, tr) * n) / 2 + tr
和tsimple
,如下所示:
tsimple = (max(td, tr)) * n
但是结果是不同的,并且两个tsimple
和tparallel
值看起来没有显著差异。
有人知道为什么吗?我期待的是正确的事情吗?还是代码有问题?
如有任何意见或指导,我将不胜感激。
1条答案
按热度按时间xdnvmnnf1#
async
方法的第一部分是同步执行的。如果被调用的方法返回了一个完成的任务,await
有时可以同步完成(参见 *"热路径"优化 * https://devblogs.microsoft.com/premier-developer/dissecting-the-async-methods-in-c/部分)。例如,
cacheManager.ReadDataAsync
可能快速完成任务并返回已完成的任务,而不是真正异步运行。那么_domainDataConverter.Convert
也将同步运行。for (int i = 0; i < parallelDegree; i++)
循环将占用ReadDataAsync
+Convert
的所有时间。是的,BulkInsertAsync
将并行运行,但是如果让Convert
也并行运行,您可以做得更好。比如:
另外,您还可以根据并行线程的数量将所有数据一次划分为多个部分,我建议将其划分为固定(可能可配置)长度的较小部分,然后使用
Parallel.ForEach
处理它们,在这种情况下,我将使用SaveCunk
或Parallel.ForEachAsync
与SaveChunkAsync
,尽管我还没有尝试过这种方法。选择什么取决于你是否有一些CPU的工作,并希望充分利用它,或者如果你主要是等待的东西。在前一种情况下,多线程方法可能会给你所需的控制加载所有的CPU核心。在后一种情况下,异步方法可能会更好。
然后,您可以使用不同的批处理大小和并行工作线程数来找到最佳组合。当我执行并行批量插入时,我使用
Parallel.ForEach
并使用4个线程。增加此限制并没有带来任何明显的好处。测试不同的实现
您可以运行以下程序来查看
SaveChunk
方法的可能时间线,具体取决于实现细节。我准备了ReadData、Convert和BulkInsert的模拟,以便它们分别花费40ms、200ms和40ms来模拟tr=24, td=4
情况。还有parallelDegree=10
。第一个变体是同步的,大约花费2800ms
。第二个是部分异步的,并且花费大约2440ms
,而第三个是异步的,并且花费大约560ms
。日志显示了所有3个变体的时间线。正如你所看到的,在第二个变体中,我们没有并行运行超过2个任务。但在第三个变体中,我们并行得多。