postgresql 如何在函数内部强制COMMIT,以便其他会话可以看到更新的行?

nc1teljy  于 2023-05-06  发布在  PostgreSQL
关注(0)|答案(2)|浏览(296)

在Postgres 12数据库中,我在一个函数中有多个查询(SELECTUPDATE,...),总共需要大约20分钟才能完成。如果status没有运行,我在顶部有一个检查,它会执行UPDATE

create or replace function aaa.fnc_work() returns varchar as 
$body$
    begin
        if (select count(*) from aaa.monitor where id='invoicing' and status='running')=0 then
           return 'running';
        else
           update aaa.monitor set status='running' where id='invoicing';
        end if;
        --- rest of code ---
        --finally
        update aaa.monitor set status='idle' where id='invoicing';
        return '';
    exception when others then
         return SQLERRM::varchar;
    end
$body$
language plpgsql;

这个想法是为了防止其他用户执行--- rest of code ---,直到status空闲。
但是,其他人(调用相同的函数)似乎没有看到更新的状态,他们也继续执行--- rest of code ---。如何在以下情况下强制提交:
update aaa.monitor set status='running' where id='invoicing' ;
以便所有其他用户会话可以看到更新的status并相应地退出。
我需要交易吗?

inb24sb2

inb24sb21#

继续阅读我把最好的留到最后。

PROCEDURE概念验证

Postgres FUNCTION始终是原子的(在单个事务 Package 器中运行),不能处理事务。所以COMMIT不允许。您可以使用dblink的技巧来解决这个问题。参见:

  • Postgres支持嵌套或自治事务吗?
  • 如何在PostgreSQL中进行大型非阻塞更新?

但是对于像这样的嵌套事务,可以考虑使用**PROCEDURE。* *Postgres 11您可以在这里管理交易:

CREATE OR REPLACE PROCEDURE aaa.proc_work(_id text, INOUT _result text = NULL)
  LANGUAGE plpgsql AS
$proc$
BEGIN
   -- optionally assert steering row exists
   PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;

   IF NOT FOUND THEN   
      RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
   END IF;

   -- try UPDATE
   UPDATE aaa.monitor
   SET    status = 'running'
   WHERE  id = _id                   -- assuming valid _id
   AND    status <> 'running';       -- assuming "status" is NOT NULL

   IF NOT FOUND THEN
      _result := 'running'; RETURN;  -- this is how you return with INOUT params
   END IF;

   COMMIT;                           -- HERE !!!

   BEGIN                             -- start new code block

      ----- code for big work HERE -----
      -- PERFORM 1/0;                -- debug: test exception?
      -- PERFORM pg_sleep(5);        -- debug: test concurrency?

      _result := '';

   -- also catching QUERY_CANCELED and ASSERT_FAILURE
   -- is a radical step to try and release 'running' rows no matter what
   EXCEPTION WHEN OTHERS OR QUERY_CANCELED OR ASSERT_FAILURE THEN
      -- ROLLBACK;                   -- roll back (unfinished?) big work
      _result := SQLERRM;
   END;                              -- end of nested block

   UPDATE aaa.monitor                -- final reset
   SET    status = 'idle'
   WHERE  id = _id
   AND    status <> 'idle';          -- only if needed
END
$proc$;

呼叫(重要!):

CALL aaa.proc_work('invoicing');  -- stand-alone call!

重要提示

UPDATE之后添加COMMIT。之后,并发事务可以看到更新的行。
但是没有额外的BEGINSTART TRANSACTIONThe manual:
CALL命令调用的过程以及匿名代码块(DO命令)中,可以使用COMMITROLLBACK命令结束事务。在使用这些命令结束事务后,会自动启动新的事务,因此没有单独的START TRANSACTION命令。(注意BEGINEND在PL/pgSQL中有不同的含义。
我们需要一个单独的PL/pgSQL code block,因为你有一个自定义的异常处理程序,并且(引用the manual):
事务不能在具有异常处理程序的块内结束。
(But我们可以在EXCEPTION处理程序中使用COMMIT/ROLLBACK。)
不能在外部事务中调用此过程,也不能与任何其他DML语句一起调用此过程,否则将强制使用外部事务 Package 。必须是独立的CALL。参见:

注意最后的UPDATE aaa.monitor SET status = 'idle' WHERE ...。否则(承诺!)status将在异常后无限期地保持“运行”。
关于从过程返回值:

  • 如何从存储过程(而不是函数)返回值?

我将DEFAULT NULL添加到INOUT参数中,因此您不必在调用时提供参数。
UPDATE直接。如果行处于“running”状态,则不进行更新。(这也是逻辑的逻辑:你的IF表达式似乎是向后的,因为当找到 no row with status='running'时,它返回'running'。你似乎想要相反的东西)。
我添加了一个(可选!)Assert以确保表aaa.monitor中的行存在。添加一个FOR KEY SHARE锁,以消除Assert和后面的UPDATE之间的竞争条件的微小时间窗口。锁定与删除或更新PK列冲突-但 * 不 * 与更新status冲突。所以在正常操作中不会引发异常!手册:
目前,UPDATE情况下考虑的列集是那些具有唯一索引的列,这些索引可以在外键中使用(因此不考虑部分索引和表达式索引),但将来可能会改变。
SKIP LOCK在锁冲突的情况下不等待。添加的异常永远不会发生。只是在演示防水概念验证。
您的更新在aaa.monitor中显示了25行,因此我添加了参数_id

上级法

上面的内容可能会让世界看到更多的信息。对于队列操作,有更有效的解决方案。使用代替,这对其他人来说是“可见的”。那么你不需要一个嵌套的事务开始,一个普通的FUNCTION就可以了:

CREATE OR REPLACE FUNCTION aaa.fnc_work(_id text)
  RETURNS text
  LANGUAGE plpgsql AS
$func$
BEGIN
   -- optionally assert that the steering row exists
   PERFORM FROM aaa.monitor WHERE id = _id FOR KEY SHARE SKIP LOCKED;
   IF NOT FOUND THEN   
      RAISE EXCEPTION 'monitor.id = % not found or blocked!', quote_literal(_id);
   END IF;

   -- lock row
   PERFORM FROM aaa.monitor WHERE id = _id FOR NO KEY UPDATE SKIP LOCKED;

   IF NOT FOUND THEN
      -- we made sure the row exists, so it must be locked
      RETURN 'running';
   END IF;

   ----- code for big work HERE -----
   -- PERFORM 1/0;                -- debug: test exception?
   -- PERFORM pg_sleep(5);        -- debug: test concurrency?

   RETURN '';

EXCEPTION WHEN OTHERS THEN
   RETURN SQLERRM;

END
$func$;

电话:

SELECT aaa.fnc_work('invoicing');

调用可以以任何您想要的方式嵌套。只要一个事务正在处理大任务,就不会启动其他事务。
同样,可选的assert取出一个FOR KEY SHARE锁,以消除竞争条件的时间窗口,并且添加的异常在正常操作中永远不会发生。
我们根本不需要status列。行锁本身就是看门人。因此,PERFORM FROM aaa.monitor ...中的SELECT列表为空。附带利益:这也不会通过来回更新行而产生死元组。如果您出于其他原因仍然需要更新status,那么您就回到了上一章的可见性问题。你可以把两者结合起来…
关于PERFORM

  • PL/pgSQL函数中的SELECT或PERFORM

关于行锁:

jgwigjjp

jgwigjjp2#

你试图完成的是一个自治事务。PostgreSQL没有一个简单的方法来做到这一点。此链接here讨论了一些替代方案。
但是,上面链接的文章中讨论的一种方法是使用PostgreSQL dblink扩展。
您需要将扩展添加到服务器

CREATE EXTENSION dblink;

然后你就可以创建一个新的函数,这个函数可以在你的函数中调用

CREATE FUNCTION update_monitor_via_dblink(msg text)
 RETURNS void
 LANGUAGE sql
AS $function$
   select dblink('host=/var/run/postgresql port=5432 user=postgres dbname=postgres',
    format(' update aaa.monitor set status= %M',msg::text)
$function$;

您可能需要考虑的另一件事是使用PostgreSQL锁。更多信息请参见here

相关问题