我尝试使用PostgreSQL数据库作为运行一些模拟的简单作业队列。我已经有一段时间没有写SQL了,尽管我尽了最大的努力(使用ChatGPT,谷歌搜索无数的错误消息,阅读文档),我还是无法解决这个问题。
当运行这个函数时,我一直得到Array value must start with "{" or dimension information. malformed array literal: "1"
我认为这是因为当它返回时,由于某种原因,它只返回job_id
列。
我使用以下模式:
runner
CREATE TABLE runner (
id integer NOT NULL,
created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
last_seen_at timestamp with time zone,
alias character varying(255) NOT NULL,
hostname character varying(255) NOT NULL
);
CREATE SEQUENCE runner_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE runner_id_seq OWNED BY runner.id;
ALTER TABLE ONLY runner ALTER COLUMN id SET DEFAULT nextval('runner_id_seq'::regclass);
ALTER TABLE ONLY runner ADD CONSTRAINT runner_pkey PRIMARY KEY (id);
作业
CREATE TYPE public.job_status AS ENUM (
'pending',
'failed',
'complete',
'running'
);
CREATE TABLE job (
id integer NOT NULL,
created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
completed_at timestamp with time zone,
status public.job_status DEFAULT 'pending'::public.job_status NOT NULL,
specification jsonb NOT NULL,
upstream_manifest integer,
completed_by integer,
attempted_by integer
);
CREATE SEQUENCE job_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
ALTER SEQUENCE job_id_seq OWNED BY job.id;
ALTER TABLE ONLY job ALTER COLUMN id SET DEFAULT nextval('job_id_seq'::regclass);
ALTER TABLE ONLY job
ADD CONSTRAINT job_pkey PRIMARY KEY (id);
ALTER TABLE ONLY job
ADD CONSTRAINT job_attempted_by_fkey FOREIGN KEY (attempted_by) REFERENCES runner(id);
ALTER TABLE ONLY job
ADD CONSTRAINT job_completed_by_fkey FOREIGN KEY (completed_by) REFERENCES runner(id);
播种表
INSERT INTO runner (alias, hostname) VALUES ('test01', 'test-host');
INSERT INTO job (specification) VALUES ('{"spec": true}'::jsonb);
request_jobs函数
-- 1. Get runner ID by alias
-- 2. Update runner last seen time (trigger?)
-- 3. Select 'pending' total_jobs limit total_jobs
-- 4. Set the foreign key of the runner who's attempting this job
CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
RETURNS TABLE(job_id integer, specification json) AS $$
DECLARE
runner_id integer;
selected_jobs job[];
BEGIN
-- Start a transaction
BEGIN
-- Retrieve the runner_id based on the runner_alias
SELECT id INTO runner_id FROM runner WHERE alias = runner_alias;
-- Select the first total_jobs pending jobs to assign them to the runner
SELECT j.id as job_id, j.specification
FROM job AS j
WHERE status = 'pending'
LIMIT total_jobs
FOR UPDATE SKIP LOCKED
INTO selected_jobs;
-- Update the selected jobs to set their status to 'running' and assign them to the runner
UPDATE job
SET attempted_by = runner_id,
status = 'running'
WHERE id IN (SELECT job_id FROM selected_jobs);
-- Return the selected jobs
RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
END;
END;
$$ LANGUAGE plpgsql;
我试过用FOR循环(RETURN NEXT)重写它,使用RETURNS SETOF作业,以及各种其他调整,但到目前为止,我还没有能够破解它。
1条答案
按热度按时间ndh0cuux1#
最后,我通过使用临时表的方法来修复它: