azure 如何在水平扩展写入时避免并发问题?

ruoxqz4g  于 2022-12-14  发布在  其他
关注(0)|答案(6)|浏览(150)

假设有一个工作者服务从队列接收消息,从文档数据库读取具有指定ID的产品,根据消息应用一些操作逻辑,最后将更新的产品写回到数据库(a)。

当处理不同的产品时,这项工作可以安全地并行完成,因此我们可以横向扩展(b)。然而,如果多个服务示例在同一个产品上工作,我们可能会以并发问题或来自数据库的并发异常结束,在这种情况下,我们应该应用一些重试逻辑(重试仍然可能再次失败,依此类推)。

问题:我们如何避免这种情况?是否有方法可以确保两个示例不在同一个产品上工作?
示例/使用案例:一家在线商店在一小时内结束了对产品A、产品B和产品C的大销售,有数百名客户在购买。对于每次购买,都会有一条消息排队(产品ID、商品数量、价格)。目标:如何运行工作服务的三个示例,并确保productA的所有消息都将在instanceA、productB到instanceB以及productC到instanceC中结束(不会导致并发问题)?
注意:我的服务是用C#编写的,作为工作者角色托管在Azure上,我使用Azure队列进行消息传递,我正在考虑使用Mongo进行存储。此外,实体ID是GUID

它更多的是关于技术/设计,所以如果你使用不同的工具来解决问题,我仍然感兴趣。

xu3bshqb

xu3bshqb1#

任何试图将负载分配给同一集合中不同项目(如订单)的解决方案都注定要失败。原因是,如果您的事务流量很高,则必须开始执行以下操作之一:
1.让节点彼此对话(hey guys, are anyone working with this?
1.将ID生成划分为段(节点a创建ID 1-1000,节点B创建ID 1001-1999)等,然后让它们处理自己的段
1.动态地将集合划分为片段(并让每个节点处理片段)。
那么这些方法有什么问题呢?
第一种方法是简单地复制数据库中的事务,除非你能花大量的时间来优化策略,否则最好还是依赖事务。
后两个选项会降低性能,因为您必须根据id动态路由消息,并且在运行时更改策略以包括新插入的消息。

溶液

这里有两种解决方案,您也可以将它们合并。

自动重试

相反,您在某处有一个入口点,用于读取消息队列。
在它里面你有这样的东西:

while (true)
{
    var message = queue.Read();
    Process(message);
}

要获得非常简单的容错,您可以做的是在失败时重试:

while (true)
{
    for (i = 0; i < 3; i++)
    {
       try
       {
            var message = queue.Read();
            Process(message);
            break; //exit for loop
       }
       catch (Exception ex)
       {
           //log
           //no throw = for loop runs the next attempt
       }
    }
}

当然,您可以只捕获数据库异常(或者更确切地说是事务失败),以便重放这些消息。

微服务

我知道,微服务是一个时髦的词。但在这种情况下,它是一个很好的解决方案。与其拥有一个处理所有消息的单一核心,不如将应用程序划分为更小的部分。或者在您的情况下,只需停止处理某些类型的消息。
如果有五个节点运行应用程序,则可以确保节点A接收与订单相关的消息,节点B接收与发货等相关的消息。
通过这样做,您仍然可以水平扩展应用程序,不会出现冲突,而且只需很少的工作(多几个消息队列并重新配置每个节点)。

rkttyhzu

rkttyhzu2#

对于这种情况,我使用blob租约。基本上,我使用某个已知存储帐户中的实体ID创建一个blob。(如果blob不存在,则创建blob本身)。如果这两项操作都成功,然后允许处理消息。2总是在之后释放租用。3如果不成功,我会将消息转储回队列
我遵循Steve马克思最初在此处描述的方法http://blog.smarx.com/posts/managing-concurrency-in-windows-azure-with-leases,但进行了调整以使用新的存储库
在注解后编辑:如果你有一个潜在的高比率的消息都交谈同一个实体(如您的建议暗示),我会重新设计您的方法在某处..无论是实体结构,或消息结构。
例如:考虑CQRS设计模式,并独立地存储来自每个消息处理的更改。因此,产品实体现在是由各个工作者对实体所做的所有更改的集合,这些更改顺序地被重新应用和再水合到单个对象中

fcy6dtqo

fcy6dtqo3#

如果您希望数据库始终保持最新,并且始终与已处理的单元保持一致,那么您可以对同一可变实体进行多次更新。
为了符合此要求,您需要序列化同一实体的更新。您可以通过在生成器上对数据进行分区来实现此目的,也可以在同一队列上累积实体的事件,还可以使用分布式锁或数据库级锁来锁定工作进程中的实体。
您可以使用一个actor模型(在java/scala环境中使用akka),为每个实体或实体组创建一个消息队列,以串行处理它们。
更新您可以尝试akka port to .nethere。在这里您可以找到一个很好的教程,其中有关于使用akka in scala的示例。但是对于一般原则,您应该搜索更多关于[actor model]的内容。尽管如此,它也有缺点。
最后,它涉及到对数据进行分区以及为特定实体创建唯一的专用工作进程(在出现故障时可以重用和/或重新启动)的能力。

z9zf31ra

z9zf31ra4#

我假设您有一种方法可以跨所有工作服务安全地访问产品队列。

// Queue[X] is the queue for product X
// QueueMain is the main queue 
DoWork(ProductType X)
{
  if (Queue[X].empty())
  {
    product = QueueMain().pop()
    if (product.type != X)
    {
      Queue[product.type].push(product) 
      return;
    }
  }else
  {
     product = Queue[X].pop()
  }

  //process product...
}

对队列的访问需要是原子的

dsekswqp

dsekswqp5#

您应该使用启用会话的服务总线队列进行排序和并发。

xdnvmnnf

xdnvmnnf6#

1)我能想到的每一个高规模数据解决方案都有内置的东西来精确地处理这种冲突。细节将取决于您对数据存储的最终选择。在传统关系数据库的情况下,这是现成的,不需要您做任何额外的工作。参考您所选择的技术的文档以获得适当的细节。
2)了解您的数据模型和使用模式。合理设计您的数据存储。不要针对您不具备的规模进行设计。针对您最常见的使用模式进行优化。
3)挑战你的假设。你真的 * 必须 * 频繁地从多个角色中改变同一个实体吗?有时答案是肯定的,但通常你可以简单地创建一个新的实体来反映更新。也就是说,采取日志/logging的方法,而不是单一实体的方法。最终,单一实体上的大量更新永远不会扩展。

相关问题