我们有一个进程,需要按顺序处理记录,不是按接收记录的顺序,而是按从源系统接收的时间戳列(recdate)的顺序,并针对特定的otherid。
例如:对于otherid值1,我们收到3条消息,对于otherid值2,我们收到5条消息。我们有一个15秒的宽限期,在此期间,我们等待所有与一个otherid值有关的消息“稳定”,然后才有资格选择消息进行处理。因此,在这15秒内,可能会有多个针对特定otherid的消息累积,但所有消息都具有不同的recdate时间戳。我们将它们“烹饪”15秒,因为它们是无序的(但不会超过1-2秒),因此在我们开始选择特定otherid的最旧消息之前,要确保它们都在那里。
第一次调用函数选择要处理的消息,应该选择最早的消息,而不管otherid值。假设选择了otherid1的最早消息。然后,我们将otheridvalue 1存储到locktbl表中,因此在我们处理所选消息之前,不会选择该otherid的其他消息,然后通过从锁表中删除其值来解锁该otherid。我们还将所选消息的状态更改为“处理中”,因为只考虑状态为“新建”的消息。这可能看起来是多余的,因为消息的otherid已经被锁定,但我们决定这样做是为了加快在表中搜索下一个“New”消息的速度。同时,otherid2或其他otherid的最旧消息可以被其他线程选中,一旦选中,otherid也会被锁定,依此类推。
因为这都是多线程的,所以消息到达时是无序的。此外,有时会有数百个线程同时调用该函数来选择要处理的下一条消息。
该过程在大多数情况下都能正常工作,但有些类型的消息到达彼此非常接近,并且对于同一个otherid,在这些情况下,我们有时会选择不按顺序处理同一个otherid的消息。选择较新的消息,并在选择要处理的较旧消息之前锁定otherid,然后当然会等待较旧的消息,并在之后进行处理。
create table tbl1 (
recid bigint not null
,otherid bigint not null
,statuscode character varying(16) COLLATE pg_catalog."default" DEFAULT 'New'::character varying
,recdate timestamp(6) with time zone not null
,msgbody text COLLATE pg_catalog."default" NOT NULL
)
create table locktbl (
otherid bigint not null,
lockdate timestamp(6) with time zone not null
)
insert into tbl1 (recid,otherid,statuscode,recdate,msgbody)
values
(1,1,'New','2023-05-04 13:16:42.004111+00','body1')
,(2,1,'New','2023-05-04 13:16:42.00521+00','body2')
CREATE OR REPLACE FUNCTION public.fn_getnextmessage(
)
RETURNS TABLE(ret_recid bigint, ret_otherid bigint, ret_msgbody text)
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL SAFE
ROWS 1000
AS $BODY$
declare
_recid bigint := null;
_otherid bigint := null;
BEGIN
SELECT
INTO _recid, _otherid
t1.recid, t1.otherid
FROM tbl1 t1
WHERE t1.recdate < (now() - interval '15 second')
AND t1.statuscode = 'New'
AND NOT EXISTS (select 1 from locktbl l where l.otherid = t1.otherid)
ORDER BY t1.recdate
-- FOR UPDATE SKIP LOCKED
LIMIT 1;
IF _recid > 0 THEN
INSERT INTO locktbl (otherid, lockdate)
VALUES (_otherid, now());
UPDATE tbl1 t1
SET statuscode = 'InProcess'
WHERE t1.recid = _recid;
RETURN QUERY
SELECT
t1.recid
,t1.otherid
,t1.msgbody
FROM tbl1 as t1
WHERE t1.recid = _recid;
ELSE
RETURN QUERY
SELECT
t1.recid
,t1.otherid
,t1.msgbody
FROM tbl1 as t1
LIMIT 0;
END IF;
END;
$BODY$;
表locktbl对otherid列有唯一约束。我们尝试了使用FOR UPDATE SKIP LOCKED或不使用它,但仍然无法按照recdate的顺序选择记录。我们不担心是否会出现阻塞错误或唯一密钥冲突错误,我们可以在应用程序代码中处理这些错误。我们只需要他们不要被无序地选中。
有什么建议吗?谢谢
1条答案
按热度按时间kknvjkwl1#
您似乎要跳过很多圈才能从队列中选择下一个项目。稍微修改一下模式,就可以在1条语句中完成此操作。并且如果需要(但不是必需的),该语句可以 Package 在函数中。
首先去掉
locktbl
表,将lockdate
列移到tbl1
。您可以让它默认为null,但如果您想避免Null
值,则将默认值设置为-infinity
。参见demo here。我假设列
tbl1.recid
是唯一的,如果不是,你应该创建一个唯一的或PK,并在上面的查询中使用。注意:我将函数标记为parallel unsafe而不是parallel safe。参见Documentation
函数和聚合必须标记为PARALLEL UNSAFE,如果它们写入数据库,访问序列,更改事务状态...