如何在Azure Functions中处理来自多个服务总线队列消息触发器的数据库连接?

lokaqttq  于 2023-06-30  发布在  其他
关注(0)|答案(1)|浏览(114)

我正在使用Azure Functions和Service Bus队列消息触发器来接收队列消息,并使用Entity Framework处理它以存储在Azure SQL中。这会正确处理一些队列消息,但对于一些队列消息,它会引发以下异常:
Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'. System.InvalidOperationException: There is already an open DataReader associated with this Connection which must be closed first.
作为参考,通过以下行打开数据库连接:

Program.cs

s.AddDbContext<IGDBAPIAppContext>(options => options.UseSqlServer("connectionstring"));

我已经尝试查看堆栈跟踪,但无法准确地指出引发此异常的具体代码。从我所能找到的,它总是抛出这个DbCommand错误:

Failed executing DbCommand (0ms) [Parameters=[p0='?' (Size = 4000), @__queueMessageProcessingRecord_Endpoint_1='?' (Size = 50), @__queueMessageProcessingRecord_Method_2='?' (Size = 50)], CommandType='Text', CommandTimeout='300']
SELECT [w].[id], [w].[endpoint], [w].[igdb_id], [w].[method], [w].[timestamp_added]
FROM (
    SELECT * FROM queuemessageprocessing WHERE timestamp_added >= @p0
) AS [w]
WHERE @__queueMessageProcessingRecord_Endpoint_1 = [w].[endpoint] AND @__queueMessageProcessingRecord_Method_2 = [w].[method]

这是使用queuemessageprocessing表的唯一一行,也是第一次使用上下文_IGDBAPIAppContext,所以在这一行之前不应该有_IGDBAPIAppContext的开放DataReader:

ProcessQueueMessages.cs

List<DatabaseModels.igdbapi.QueueMessageProcessing> queueMessageProcessingResults = _IGDBAPIAppContext.QueueMessageProcessing.FromSqlRaw("SELECT * FROM queuemessageprocessing WHERE timestamp_added >= {0}", DateTime.UtcNow.AddMinutes(-10).ToString("yyyy-MM-dd HH:mm:ss")).Where(result => Equals(queueMessageProcessingRecord.Endpoint, result.Endpoint) && Equals(queueMessageProcessingRecord.Method, result.Method)).ToList();

根据我对System.InvalidOperationException异常的理解,Entity Framework将返回IQueryable数据类型,从而保持DataReader打开,但不应该是这种情况,因为查询是用.ToList()枚举的。我对发生此异常的原因的最佳猜测是,当队列消息在ProcessQueueMessages.cs中执行其他查询时,其他队列消息正在触发对同一数据库连接的服务总线队列消息触发器。我的假设可能是完全错误的,所以我非常感谢任何可能导致异常抛出的其他见解,我可以在Azure门户或Application Insights中查看。

更新1

我已经尝试调整host.json中消耗的队列消息量,但仍然抛出异常。

{
    "extensions": {
      "queues": {
        "batchSize": 1,
        "newBatchThreshold": 0
      }
    }
}

我还尝试在应用程序服务应用程序设置中添加WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1
这些更改来自以下StackOverflow帖子:How to limit concurrent Azure Function executions

更新2

我还尝试将MultipleActiveResultSets=true添加到连接字符串中,但收到了这个新的异常:
Result: An exception occurred while iterating over the results of a query for context type 'webhooksecretply.DatabaseModels.igdbapi.IGDBAPIAppContext'. System.InvalidOperationException: ExecuteReader requires an open and available Connection. The connection's current state is open.

qnakjoqk

qnakjoqk1#

我知道自从我发布这个问题已经有几个月了,但我终于有时间来解决这个问题,并找到了解决方案。从本质上讲,在服务总线队列中批处理消息是我最终得到的解决方案。
这个解决方案取自这个GitHub文档页面,并进行了一些绑定修改:https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#batch-triggers

[FunctionName("TriggerBatch")]
public static void Run(
    [ServiceBusTrigger("<queue_name>", Connection = "<connection_name>", IsBatched = true)] string[] messages,
    ILogger logger)
{
    foreach (string message in messages)
        logger.LogInformation($"C# function triggered to process a message: {message}");
}

相关问题