postgresql 多个应用程序线程未按顺序选择Postgres记录

eiee3dmh  于 2023-05-06  发布在  PostgreSQL
关注(0)|答案(1)|浏览(109)

我们有一个进程,需要按顺序处理记录,不是按接收记录的顺序,而是按从源系统接收的时间戳列(recdate)的顺序,并针对特定的otherid
例如:对于otherid值1,我们收到3条消息,对于otherid值2,我们收到5条消息。我们有一个15秒的宽限期,在此期间,我们等待所有与一个otherid值有关的消息“稳定”,然后才有资格选择消息进行处理。因此,在这15秒内,可能会有多个针对特定otherid的消息累积,但所有消息都具有不同的recdate时间戳。我们将它们“烹饪”15秒,因为它们是无序的(但不会超过1-2秒),因此在我们开始选择特定otherid的最旧消息之前,要确保它们都在那里。
第一次调用函数选择要处理的消息,应该选择最早的消息,而不管otherid值。假设选择了otherid1的最早消息。然后,我们将otheridvalue 1存储到locktbl表中,因此在我们处理所选消息之前,不会选择该otherid的其他消息,然后通过从锁表中删除其值来解锁该otherid。我们还将所选消息的状态更改为“处理中”,因为只考虑状态为“新建”的消息。这可能看起来是多余的,因为消息的otherid已经被锁定,但我们决定这样做是为了加快在表中搜索下一个“New”消息的速度。同时,otherid2或其他otherid的最旧消息可以被其他线程选中,一旦选中,otherid也会被锁定,依此类推。
因为这都是多线程的,所以消息到达时是无序的。此外,有时会有数百个线程同时调用该函数来选择要处理的下一条消息。
该过程在大多数情况下都能正常工作,但有些类型的消息到达彼此非常接近,并且对于同一个otherid,在这些情况下,我们有时会选择不按顺序处理同一个otherid的消息。选择较新的消息,并在选择要处理的较旧消息之前锁定otherid,然后当然会等待较旧的消息,并在之后进行处理。

create table tbl1 (
    recid bigint not null
    ,otherid bigint not null
    ,statuscode character varying(16) COLLATE pg_catalog."default" DEFAULT 'New'::character varying
    ,recdate timestamp(6) with time zone not null
    ,msgbody text COLLATE pg_catalog."default" NOT NULL
)

create table locktbl (
    otherid bigint not null, 
    lockdate timestamp(6) with time zone not null
)

insert into tbl1 (recid,otherid,statuscode,recdate,msgbody)
values
(1,1,'New','2023-05-04 13:16:42.004111+00','body1')
,(2,1,'New','2023-05-04 13:16:42.00521+00','body2')

CREATE OR REPLACE FUNCTION public.fn_getnextmessage(
    )
    RETURNS TABLE(ret_recid bigint, ret_otherid bigint, ret_msgbody text) 
    LANGUAGE 'plpgsql'
    COST 100
    VOLATILE PARALLEL SAFE 
    ROWS 1000

AS $BODY$
declare
    _recid bigint := null;
    _otherid bigint := null;
BEGIN
    SELECT 
    INTO _recid, _otherid
        t1.recid, t1.otherid
    FROM tbl1 t1
    WHERE t1.recdate < (now() - interval '15 second')
    AND t1.statuscode = 'New'
    AND NOT EXISTS (select 1 from locktbl l where l.otherid = t1.otherid)
    ORDER BY t1.recdate
    -- FOR UPDATE SKIP LOCKED
    LIMIT 1;

    IF _recid > 0 THEN 
        INSERT INTO locktbl (otherid, lockdate)
        VALUES (_otherid, now());

        UPDATE tbl1 t1
        SET statuscode = 'InProcess'
        WHERE t1.recid = _recid;            

        RETURN QUERY
        SELECT 
            t1.recid
            ,t1.otherid
            ,t1.msgbody
        FROM tbl1 as t1
        WHERE t1.recid = _recid;
    ELSE
        RETURN QUERY
        SELECT 
            t1.recid
            ,t1.otherid
            ,t1.msgbody
        FROM tbl1 as t1
        LIMIT 0;    
    END IF;
END;
$BODY$;

locktblotherid列有唯一约束。我们尝试了使用FOR UPDATE SKIP LOCKED或不使用它,但仍然无法按照recdate的顺序选择记录。我们不担心是否会出现阻塞错误或唯一密钥冲突错误,我们可以在应用程序代码中处理这些错误。我们只需要他们不要被无序地选中。
有什么建议吗?谢谢

kknvjkwl

kknvjkwl1#

您似乎要跳过很多圈才能从队列中选择下一个项目。稍微修改一下模式,就可以在1条语句中完成此操作。并且如果需要(但不是必需的),该语句可以 Package 在函数中。
首先去掉locktbl表,将lockdate列移到tbl1。您可以让它默认为null,但如果您想避免Null值,则将默认值设置为-infinity。参见demo here

create or replace
function fn_getnextmessage()
    returns table( ret_recid   bigint
                 , ret_otherid bigint
                 , ret_msgbody text
                 ) 
    language sql
    volatile 
    parallel unsafe
as $$
    update tbl1 
       set lockdate = now()
         , statuscode = 'InProgress'
     where recid = 
           (select recid
              from tbl1 t1  
             where t1.recdate < (now() - interval '15 second')
               and t1.statuscode = 'New'
               and lockdate = '-infinity'::timestamp(6) with time zone
             order by t1.recdate
             limit 1
           ) 
    returning recid, otherid, msgbody    
$$;

我假设列tbl1.recid是唯一的,如果不是,你应该创建一个唯一的或PK,并在上面的查询中使用。
注意:我将函数标记为parallel unsafe而不是parallel safe。参见Documentation
函数和聚合必须标记为PARALLEL UNSAFE,如果它们写入数据库,访问序列,更改事务状态...

相关问题