为什么我在WPF中的并行MySQL任务实现中使用TransactionScope和Task.WhenAll时会出错?

1wnzp6jl  于 2023-05-28  发布在  Mysql
关注(0)|答案(3)|浏览(360)

我的原始代码使用TransactionScope和Async/Await。在我的WPF应用程序上有一个DispatcherTimer,在tick中有一些代码,如下所示:

List<string> connectionString = new List<string>();
foreach (string connStr in connectionString)
{
    using (TransactionScope transactionScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        try
        {
            await CallSQL1(connStr);
            await CallSQL2(connStr);
            transactionScope.Complete();
        }
        catch (Exception ex)
        {
            Console.WriteLine("Call Sql Error");
        }
    }
}

公共函数CallSQL 1和CallSQL 2如下

public async Task<int> CallSQL1(string connStr)
{
    try
    {
        using (MySqlConnection myConn = new MySqlConnection(connStr))
        {
            await myConn.OpenAsync();
            string sCmd = "INSERT INTO machine_info VALUES ('CCC','099','AOT','2020-02-02 02:00:00')";

            using (MySqlCommand myCmd = new MySqlCommand(sCmd, myConn))
            {
                int result = await myCmd.ExecuteNonQueryAsync();
                return 1;
            }
        }
    }
    catch (MySqlException ex)
    {
        throw new Exception("Error Occur");
    }
}

public async Task<int> CallSQL2(string connStr)
{
    try
    {
        using (MySqlConnection myConn = new MySqlConnection(connStr))
        {
            await myConn.OpenAsync();
            string sCmd = "INSERT INTO machine_status VALUES ('workpiece','10','IDLE','9','3030-03-03 03:00:00')";

            using (MySqlCommand myCmd = new MySqlCommand(sCmd, myConn))
            {
                int result = await myCmd.ExecuteNonQueryAsync();
                return 1;
            }
        }
    }
    catch (Exception ex)
    {
        throw new Exception("Error Occur");
    }
}

列表存储3个不同的数据库连接信息。每一个tick,我将连接到3个数据库,并调用2个函数(CallSQL 1,CallSQL 2),每个函数将插入数据到不同的表。如果发生错误,TransactionScope将确保它将回滚。现在一切都很好!
然后我在想,也许我可以改进它。使用Task.Add和Task. WhenAll使其成为并行工作。所以我修改代码如下:

List<Task> disTaskList = new List<Task>();
foreach (string connStr in connectionString)
{
    using (TransactionScope transactionScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        try
        {
            disTaskList.Add(CallSQL1(connStr));
            disTaskList.Add(CallSQL2(connStr));
            transactionScope.Complete();
        }
        catch (Exception ex)
        {
            Console.WriteLine("Call Sql Error Excption = ");
        }
    }
    await Task.WhenAll(disTaskList);
}

现在错误显示在最后一行'await Task.WhenAll(disTaskList);'
System.Collections.Generic.KeyNotFoundException:'字典中不存在给定的键。'
我的想法有问题吗?我的想法是,如果有错误发生,那么transactionScope将回滚disTaskList。为这个数据库添加。在foreach 3数据库之后,我可以在disTaskList中等待其余的任务。
例如:

  • DB1CallSQL1ok-> disTaskList.Add(CallSQL1(connStr)),disTaskList.count = 1。
  • DB 1CallSQL 2 error -> transactionScope将回滚,disTaskList.count = 0。
  • DB2CallSQL1ok-> disTaskList.Add(CallSQL1(connStr)),disTaskList.count = 1。
  • DB2CallSQL2ok-> disTaskList.Add(CallSQL2(connStr)),disTaskList.count = 2。
  • DB3 CallSQL1 ok -> disTaskList.Add(CallSQL1(connStr)),disTaskList.count = 3。
  • DB3 CallSQL2 ok -> disTaskList.Add(CallSQL2(connStr)),disTaskList.count = 4。

有人能给予我一些建议吗?

7tofc5zh

7tofc5zh1#

可能的原因是您在两个SQL调用都完成之前就处理/完成了事务。
在单个事务中并行运行多个操作的想法对我来说似乎很奇怪,但我不是数据库Maven。您确定不应该为每个操作使用单独的事务范围吗?
你至少应该移动任务。在范围内等待:

disTaskList.Add(CallSQL1(connStr));
disTaskList.Add(CallSQL2(connStr));
await Task.WhenAll(disTaskList);
transactionScope.Complete();

或者只是跳过异步的东西,在我使用的MySql版本中,真正的异步操作不受支持,它们在实践中会阻塞。

cbwuti44

cbwuti442#

这些方法从根本上说是错误的--并发执行低效的查询通常会导致性能下降,因为连接会相互阻塞。对于实际尝试写入相同磁盘块的INSERT来说尤其如此。
在多个连接上使用TransactionScope要糟糕得多,因为它需要覆盖所有连接的分布式事务。这些连接将保持打开状态,直到分布式事务本身完成,从而增加了阻塞的机会。
在这种特定情况下,使用单个连接,性能可以提高几个数量级甚至几个数量级(从最慢到最快):
1.将多个INSERT命令批处理为一个。实际上,SQL字符串包含多个INSERT。这减少了冲突和网络延迟,例如:

INSERT INTO machine_info (..) VALUES ('CCC','099','AOT','2020-02-02 02:00:00');
INSERT INTO machine_info (..) VALUES ('CCC','099','AOT','2020-02-02 02:00:00');

1.创建包含多行的单个INSERT

INSERT INTO machine_info (..) 
VALUES 
    ('CCC','099','AOT','2020-02-02 02:00:00'),
    ('CCC','099','AOT','2020-02-02 02:00:00'),
    ('CCC','099','AOT','2020-02-02 02:00:00');

1.使用MySqlBulkCopyMySqlBulkLoader在后台使用MySQL的LOAD DATA导入数据

var bulkCopy = new MySqlBulkCopy(connection);
bulkCopy.DestinationTableName = "some_table_name";
var result = await bulkCopy.WriteToServerAsync(dataTable);

using var connection = new MySqlConnection("...;AllowLoadLocalInfile=True");
await connection.OpenAsync();
var bulkLoader = new MySqlBulkLoader(connection)
{
    FileName = @"C:\Path\To\file.csv",
    TableName = "destination",
    CharacterSet = "UTF8",
    NumberOfLinesToSkip = 1,
    FieldTerminator = ",",
    FieldQuotationCharacter = '"',
    FieldQuotationOptional = true,
    Local = true,
}
var rowCount = await bulkLoader.LoadAsync();
lx0bsm1f

lx0bsm1f3#

你最初的方法更好。使用Task.WhenAll的第二个解决方案在任务仍在执行时完成事务。
事务完成可能会在操作中被尝试,这是您实际上不希望的。
事务的全部意义在于确保数据库不会提交需要一起发生的操作序列的一部分。
如果你的CallSQL1CallSQL2不需要一起提交,那么你根本不需要事务。
如果您想坚持使用Task.WhenAll,那么您必须等待事务完成。

List<Task> disTaskList = new List<Task>();
foreach (string connStr in connectionString)
{
    using (TransactionScope transactionScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        try
        {
            disTaskList.Add(CallSQL1(connStr));
            disTaskList.Add(CallSQL2(connStr));
            await Task.WhenAll(disTaskList);
            transactionScope.Complete();
        }
        catch (Exception ex)
        {
            Console.WriteLine("Call Sql Error Excption = ");
        }
    }
}

更新:

如果你希望对每个数据库的调用是并行的,而不仅仅是在单个数据库中的操作,那么你可以尝试这种方法:
创建一个方法,执行所有必需的DB调用。

private async Task CallSqlMethods(string connStr)
{
    using (TransactionScope transactionScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        try
        {
            await CallSQL1(connStr);
            await CallSQL2(connStr);
            transactionScope.Complete();
        }
        catch (Exception ex)
        {
            Console.WriteLine("Call Sql Error Excption = ");
        }
    }
}

然后像这样调用该方法:

List<Task> disTaskList = new List<Task>();
foreach (string connStr in connectionString)
{
    disTaskList.Add(CallSqlMethods(connStr));
}
await Task.WhenAll(disTaskList);

相关问题