SQL Server How to run multi threads in a sequence without tracking or knowing if there any threads existing before starting a new thread

mgdq6dx1  于 2023-10-15  发布在  其他
关注(0)|答案(1)|浏览(116)

I have a situation where my program performs these two tasks

  1. Fetch data via ODBC for 96 tables one after the other.
    a. Create insert commands sql file.
    b. Execute the inserts-script via SqlCmd command line utility.

I need to enhance this process so as I can utilize the full potential of the CPU via multi-threading

The ideal solution I am trying to implement is as such. The main thread will keep fetching the data via ODBC where as upon each table's data is fetched and stored in a file on disk the main thread then initiates a new Process of SqlCMD utility which inserts the data into SQL server and the main thread goes on to fetching the next table via ODBC.

The problem or the goal that I want to achieve is that I want to make sure that the new Process of SqlCmd start in a queue/sequence only when the prior processese of SqlCMD are finished

This method of exportOdbcToSQLDB is called in an outer loop 96 times.

private void exportOdbcToSQLDB(string tableName, SqlConnection sqlCon, OdbcConnection odbcCon)
 {
     InsertAndValidate insertValidateObj = new InsertAndValidate();
     insertValidateObj.sqlCon = sqlCon;
     insertValidateObj.odcConnection = odbcCon;
     insertValidateObj.logTableName = "_Log";

     if (string.IsNullOrEmpty(tableName))
         return;
     String columns = "";
     string tmpSelect = "SELECT * FROM [" + tableName + "]";
     using (var adapter = new SqlDataAdapter(tmpSelect, sqlCon))
     using (var builder = new SqlCommandBuilder(adapter))
     {
         columns = builder.GetInsertCommand().CommandText;
         columns = columns.Split(new String[] { ")" }, StringSplitOptions.None)[0];
         columns = columns.Split(new String[] { "(" }, StringSplitOptions.None)[1];
         columns = columns.Trim();
         Application.DoEvents();
     }

     int totalCount = 0;
     DateTime startTime = DateTime.Now;
     string strInsert = "";
     System.IO.File.WriteAllText("sql_" + tableName + ".sql", "");

     string tableFilePath = Path.Combine(Config.qbTablesDir, tableName + ".sql");
     using (StreamWriter writer = new StreamWriter(tableFilePath, true))
     {
         writer.WriteLine("SET NOCOUNT ON" + Helper1.Go());
         writer.Write("Update _Log set Remarks  = Concat(Remarks, '| Insert Start time ',dbo.dNow())" + Helper1.Go());
         string insertTemp = "Insert into [" + tableName + "] (" + columns + ") Values ";
         string queryString = @"select " + columns + " from [" + tableName + "]";

         insertValidateObj.addTableState(tableName);
         insertValidateObj.ResetStats(tableName);
         Application.DoEvents();

         OdbcCommand command = new OdbcCommand(queryString, insertValidateObj.odcConnection);
         using (OdbcDataReader reader = command.ExecuteReader())
         {
             int colCount = reader.FieldCount;
             if (reader.HasRows)
             {
                 try
                 {
                     while (reader.Read())
                     {
                         totalCount++;
                         writer.Write(insertTemp + this.GenerateInsert(reader, tableName, columns) + Helper1.Go());
                     }
                 }
                 catch (Exception ex)
                 {
                     string exception1 = (String.Concat("Row ", totalCount, ex.ToString()));
                 }
                 Application.DoEvents();
             }
             insertTemp = "Update _Log set Remarks  = Concat(Remarks, '| Insert End time ',dbo.dNow())" + Helper1.Go();
             insertTemp = "Update _log set SuccessRecords = (Select count(1) from [" + tableName + "]) where TableName = '" + tableName + "'" + Helper1.Go();
             writer.Write(insertTemp);
         }
     }
     DateTime endTime = DateTime.Now;
     insertValidateObj.SetStats(tableName, totalCount, 0, 0, startTime, endTime);

     if (totalCount > 0)
     {
         tablesToInsert.Enqueue(tableName);
         ExecuteQbInserts(tableName);
     }
 }

This method is called in the above method

public static void ExecuteQbInserts(string tableName)
 {
     string tmpPathInput = Path.Combine(Config.qbTablesDir, tableName + ".sql");
     string tmpPathOutput = Path.Combine(Config.qbSqlInsertsLogsDir, tableName + "_log.txt");

     string command = string.Concat("sqlcmd -S . -d qb", Config.ClientName, " -U sa -P atiqf -i \"", tmpPathInput, "\" -o \"", tmpPathOutput, "\"");
     ProcessStartInfo processInfo;
     processInfo = new ProcessStartInfo("cmd.exe", "/c " + command);
     processInfo.CreateNoWindow = false;
     processInfo.UseShellExecute = false;
     Process.Start(processInfo);
 }

Result:

Multiple SqlCMD processes runs when the rate of sqlCmd inserts is less than the rate at which the main thread fetches the ODBC data.

Solution required: Have the the main thread keeps getting data of 96 tables at its own rate and have sqlCMD processes run one after the other.

pdsfdshx

pdsfdshx1#

I think that a sufficient solution to your problem would be to parallelize the ExportOdbcToSQLDB and ExecuteQbInserts in this way: The ExportOdbcToSQLDB for the table i will be executed in parallel with the ExecuteQbInserts for the table i - 1 . Assuming that one of the two operations is consistently slower than the other, the total amount of time for processing all tables will be equal with the sum of the slower operation. The faster operation will travel by hitchhiking, so to say, without affecting the total execution time.

The ParallelizeTwoActions method below implements this scheme in an abstract generic way:

/// <remarks>
/// Invokes two actions for each element in the source sequence, sequentially.
/// The action1 is invoked sequentially for one element at a time.
/// The action2 is also invoked sequentially for one element at a time.
/// The action1 for an element is invoked in parallel with the action2 for its
/// previous element.
/// </remarks>
public static async Task ParallelizeTwoActions<TSource>(IEnumerable<TSource> source,
    Action<TSource> action1, Action<TSource> action2)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(action1);
    ArgumentNullException.ThrowIfNull(action2);

    Task task1 = Task.CompletedTask;
    Task task2 = Task.CompletedTask;
    try
    {
        foreach (TSource item in source)
        {
            task1 = Task.Run(() => action1(item));
            await Task.WhenAll(task1, task2).ConfigureAwait(false);
            task2 = Task.Run(() => action2(item));
        }
    }
    finally
    {
        await Task.WhenAll(task1, task2).ConfigureAwait(false);
    }
}

The marble diagram below illustrates how the action1 and action2 are invoked for six elements A, B, C, D, E and F.

action1: A------| B----------| C-----|    D---------| E-------| F-------|
action2:          A----|       B--------| C---|       D----|    E----|    F----|

Notice how the action1(B) overlaps with the action2(A) .

You could use the ParallelizeTwoActions like this:

private async void btnExecute_Click(object sender, EventArgs e)
{
    try
    {
        Cursor = Cursors.WaitCursor;
        btnExecute.Enabled = false;

        await ParallelizeTwoActions(tableNames, tableName =>
        {
            ExportOdbcToSQLDB(tableName, sqlCon, odbcCon);
        }, tableName =>
        {
            ExecuteQbInserts(tableName);
        });
    }
    finally
    {
        btnExecute.Enabled = true;
        Cursor = Cursors.Default;
    }
}

For this to work, you must do a change at the bottom of the ExecuteQbInserts :

Process process = Process.Start(processInfo);
process.WaitForExit(); /* Block the current thread until the process terminates */

You must also remove all the Application.DoEvents(); lines, because both the ExportOdbcToSQLDB and the ExecuteQbInserts will be invoked on ThreadPool threads. You should avoid any interaction with UI controls inside these methods. It is strictly forbidden to interact with UI controls from any thread other than the UI thread.

In case you are not familiar with async / await , you might want to read some tutorials, like this or this . It's a really great way to keep your UI responsive by offloading work to background threads, without complicating your code with manual Thread management, or with awkward BackgroundWorker s etc.

相关问题