在mysql中使用并发worker进行sql原子读取和更新

eit6fx6z  于 2021-07-26  发布在  Java
关注(0)|答案(2)|浏览(433)

假设我有多个worker可以同时对一个mysql表进行读写(例如。 jobs ). 每个工人的任务是:
找到最老的 QUEUED 工作
将其状态设置为 RUNNING 返回相应的id。
请注意,可能没有任何限定(即。 QUEUED )工作人员运行步骤1时的作业。
到目前为止,我有以下伪代码。我想我需要取消( ROLLBACK )如果第1步没有返回任何作业,则返回事务。在下面的代码中我将如何做到这一点?

  1. BEGIN TRANSACTION;
  2. # Update the status of jobs fetched by this query:
  3. SELECT id from jobs WHERE status = "QUEUED"
  4. ORDER BY created_at ASC LIMIT 1;
  5. # Do the actual update, otherwise abort (i.e. ROLLBACK?)
  6. UPDATE jobs
  7. SET status="RUNNING"
  8. # HERE: Not sure how to make this conditional on the previous ID
  9. # WHERE id = <ID from the previous SELECT>
  10. COMMIT;
c9qzyr3d

c9qzyr3d1#

我这周正在实施一个和你的案子非常相似的计划。一组工人,每个人抓住一组行中的“下一行”进行工作。
伪代码是这样的:

  1. BEGIN;
  2. SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
  3. UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
  4. COMMIT;

使用 FOR UPDATE 重要的是要避免比赛条件,即一个以上的工人试图抢同一排。
看到了吗https://dev.mysql.com/doc/refman/8.0/en/select-into.html 有关 SELECT ... INTO .

wgx48brx

wgx48brx2#

你想要什么还不太清楚。但假设你的任务是:找到下一个 QUEUED 工作。将其状态设置为 RUNNING 并选择相应的id。
在单线程环境中,只需使用代码即可。将所选id提取到应用程序代码中的变量中,并将其传递给where子句中的update查询。您甚至不需要事务,因为只有一个书面语句。您可以在sqlscript中进行模拟。
假设这是您当前的状态:

  1. | id | created_at | status |
  2. | --- | ------------------- | -------- |
  3. | 1 | 2020-06-15 12:00:00 | COMLETED |
  4. | 2 | 2020-06-15 12:00:10 | QUEUED |
  5. | 3 | 2020-06-15 12:00:20 | QUEUED |
  6. | 4 | 2020-06-15 12:00:30 | QUEUED |

您想启动下一个排队作业(id=2)。

  1. SET @id_for_update = (
  2. SELECT id
  3. FROM jobs
  4. WHERE status = 'QUEUED'
  5. ORDER BY id
  6. LIMIT 1
  7. );
  8. UPDATE jobs
  9. SET status="RUNNING"
  10. WHERE id = @id_for_update;
  11. SELECT @id_for_update;

你会得到

  1. @id_for_update
  2. 2

从最后一个选择。表将具有以下状态:

  1. | id | created_at | status |
  2. | --- | ------------------- | -------- |
  3. | 1 | 2020-06-15 12:00:00 | COMLETED |
  4. | 2 | 2020-06-15 12:00:10 | RUNNING |
  5. | 3 | 2020-06-15 12:00:20 | QUEUED |
  6. | 4 | 2020-06-15 12:00:30 | QUEUED |

db fiddle视图
如果有多个启动作业的进程,则需要使用 FOR UPDATE . 但这可以避免使用 LAST_INSERT_ID() :
从上述状态开始,作业2已在运行:

  1. UPDATE jobs
  2. SET status = 'RUNNING',
  3. id = LAST_INSERT_ID(id)
  4. WHERE status = 'QUEUED'
  5. ORDER BY id
  6. LIMIT 1;
  7. SELECT LAST_INSERT_ID();

您将获得:

  1. | LAST_INSERT_ID() | ROW_COUNT() |
  2. | ---------------- | ----------- |
  3. | 3 | 1 |

新的状态是:

  1. | id | created_at | status |
  2. | --- | ------------------- | -------- |
  3. | 1 | 2020-06-15 12:00:00 | COMLETED |
  4. | 2 | 2020-06-15 12:00:10 | RUNNING |
  5. | 3 | 2020-06-15 12:00:20 | RUNNING |
  6. | 4 | 2020-06-15 12:00:30 | QUEUED |

db fiddle视图
如果update语句不影响任何行(没有排队的行) ROW_COUNT()0 .
可能会有一些风险,我不知道-但这也不是我真正将如何处理这一点。我宁愿把更多的信息储存在 jobs table。简单示例:

  1. CREATE TABLE jobs (
  2. id INT auto_increment primary key,
  3. created_at timestamp not null default now(),
  4. updated_at timestamp not null default now() on update now(),
  5. status varchar(50) not null default 'QUEUED',
  6. process_id varchar(50) null default null
  7. );

  1. UPDATE jobs
  2. SET status = 'RUNNING',
  3. process_id = 'some_unique_pid'
  4. WHERE status = 'QUEUED'
  5. ORDER BY id
  6. LIMIT 1;

现在一个正在运行的作业属于一个特定的进程,您只需使用

  1. SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

你甚至想知道更多的信息。 queued_at , started_at , finished_at .

展开查看全部

相关问题