php 如何在laravel中防止特定作业的并发处理?

w3nuxt5m  于 2023-09-29  发布在  PHP
关注(0)|答案(1)|浏览(159)

我目前正在为一个应用程序实现一个钱包充值系统。我正在使用一个名为AdjustWalletBalance的作业来处理这个问题。该工作只是读取客户当前的钱包余额并将其增加一定的金额。下面是作业的代码:

use App\Events\Tenant\WalletTopupSuccessful;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;

class AdjustWalletBalance implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(
        /**
         * The amount to add to wallet balance. Can be negative for deductions.
         */
        public float $amount,
        /**
         * User who's wallet should be adjusted.
         */
        public User $user,
    )
    {
        $this->onQueue('wallets');
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        $process_id = mt_rand();
        $wallet = $user->wallet;
        \Log::debug("starting $process_id at " . now());
        \Log::debug("adjusting wallet balance with amount: {$this->amount}");
        \Log::debug("current balance for $process_id: {$wallet->balance}");

        // Increasing the running time for the job to make it easier to have 
        // two simultaneous jobs running for the sake of this demonstration.
        sleep(10);

        $wallet->balance = $wallet->balance + $this->amount;
        $wallet->save();
        \Log::debug("$process_id updated balance to {$wallet->balance}");
    }

    public function middleware()
    {
        return [(new WithoutOverlapping($this->user->id))];
    }

}

由于处理用户的钱包余额是一项非常敏感的任务,我不希望为特定用户运行多个作业示例,从而导致竞争条件和不正确的余额更新,因此我将WithoutOverlapping中间件添加到作业中,使用用户的id作为唯一密钥。但是它不起作用,即使我添加了中间件,我仍然可以为完全相同的用户并行执行作业。
我有一个这样的路由条目:

Route::get('/queue-test', function() {
    dispatch(new \App\Jobs\Tenant\AdjustWalletBalance(50, User::first());
});

我正在同时运行两个队列worker,如果我访问路由两次,我可以立即看到每个worker开始处理AdjustWalletBalance作业,这不应该是:

。当我检查日志文件以进一步验证作业的执行顺序时,我看到了以下内容:

[2023-09-24 19:12:42] local.DEBUG: starting 1007152283 at: 2023-09-24 19:12:42  
[2023-09-24 19:12:42] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-24 19:12:42] local.DEBUG: current balance for 1007152283: 0  
[2023-09-24 19:12:43] local.DEBUG: starting 241490440 at: 2023-09-24 19:12:43
[2023-09-24 19:12:43] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-24 19:12:43] local.DEBUG: current balance for 241490440: 0  
[2023-09-24 19:12:52] local.DEBUG: 1007152283 updated balance to 50  
[2023-09-24 19:12:53] local.DEBUG: 241490440 updated balance to 50

正如你可能已经注意到的,这是一个很大的问题。用户的钱包被记入两次#50,但由于这两个作业同时运行,用户的钱包余额被更新为#50而不是#100!
我已经将CACHE_DRIVER设置为array,所以我认为这不是配置问题。
我还尝试使用ShouldBeUnique合约并将用户的id设置为uniqueId,如下所示

class AdjustWalletBalance implements ShouldQueue, ShouldBeUnique
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(/**/)
    { /* Code */ }

    public function uniqueId(): int
    {
        return $this->user->id;
    }

    public function handle()
    { /* Code */ }
}

但我还是得到了同样的结果。
如果我杀死另一个队列worker,只留下一个运行,那么作业的后续示例将添加到队列中,用户的钱包将正确更新。但是应用程序将有数百个用户,我需要同时运行多个队列工作器来快速处理他们的钱包更新,因此,单个队列工作器不是一个选择。

n9vozmp4

n9vozmp41#

我把我的代码编辑成这样;

class AdjustWalletBalance extends ShouldQueue {

    /* Skipped unchanged parts of the code */

    public ?string $process_id;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        $process_id = mt_rand();
        $this->process_id = $process_id;

        // NOTE: Perform any queries relating to the row you wish to lock before you start
        // the transaction. Once the transaction begins, the same row will keep being 
        // returned regardless of any changes that occurred outside the transaction
        $wallet_id = $this->user->wallet->id;

        // P.S use DB::transaction() instead, S.O's syntax highlighting went 
        // wonky when I used it so I changed to beginTransaction instead
        \DB::beginTransaction();
            // Obtain lock on the current user's wallet. Now only our transaction can modify it.
            \DB::statement("select * from wallets where id = $wallet_id for update;");

            $wallet = $this->user->wallet;
            \Log::debug("starting $process_id at: " . now());
            \Log::debug("adjusting wallet balance with amount: {$this->amount}");
            \Log::debug("current balance for $process_id: {$wallet->balance}");
            sleep(10);

            $wallet->balance += $this->amount;
            $wallet->save();
        \DB::commit();
        $wallet = $this->user->wallet;
        \Log::debug("$process_id updated balance to {$wallet->balance}");
    }

    public function fail(\Throwable $th)
    {
        \DB::rollback(); // Remove this line if you're using DB::transaction
        \Log::debug("{$this->process_id} failed with exception:\n" . $th->getMessage());
    }

}

现在,检查我的日志文件可以得到:

[2023-09-25 01:31:53] local.DEBUG: starting 878985182 at: 2023-09-25 01:31:53  
[2023-09-25 01:31:53] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-25 01:31:53] local.DEBUG: current balance for 878985182: 0  
[2023-09-25 01:32:03] local.DEBUG: starting 2126825516 at: 2023-09-25 01:32:03  
[2023-09-25 01:32:03] local.DEBUG: adjusting wallet balance with amount: 50  
[2023-09-25 01:32:03] local.DEBUG: current balance for 2126825516: 50  
[2023-09-25 01:32:03] local.DEBUG: 878985182 updated balance to 50  
[2023-09-25 01:32:13] local.DEBUG: 2126825516 updated balance to 100

成功!检查我的数据库也验证了用户的钱包已正确更新。
队列工作者的屏幕截图显示,每个工作者不再只花10秒:

这意味着MySQL不允许另一个worker获得用户钱包行的锁,直到另一个事务完成,从而消除了并行钱包更新。
促使我采用此解决方案的资源:

相关问题