如何从PostgreSQL函数返回临时表?

9nvpjoqh  于 2023-06-29  发布在  PostgreSQL
关注(0)|答案(1)|浏览(205)

我尝试使用PostgreSQL数据库作为运行一些模拟的简单作业队列。我已经有一段时间没有写SQL了,尽管我尽了最大的努力(使用ChatGPT,谷歌搜索无数的错误消息,阅读文档),我还是无法解决这个问题。
当运行这个函数时,我一直得到Array value must start with "{" or dimension information. malformed array literal: "1"
我认为这是因为当它返回时,由于某种原因,它只返回job_id列。
我使用以下模式:

runner

CREATE TABLE runner (
    id integer NOT NULL,
    created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
    last_seen_at timestamp with time zone,
    alias character varying(255) NOT NULL,
    hostname character varying(255) NOT NULL
);

CREATE SEQUENCE runner_id_seq
    AS integer
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;
ALTER SEQUENCE runner_id_seq OWNED BY runner.id;
ALTER TABLE ONLY runner ALTER COLUMN id SET DEFAULT nextval('runner_id_seq'::regclass);
ALTER TABLE ONLY runner ADD CONSTRAINT runner_pkey PRIMARY KEY (id);

作业

CREATE TYPE public.job_status AS ENUM (
    'pending',
    'failed',
    'complete',
    'running'
);
CREATE TABLE job (
    id integer NOT NULL,
    created_at timestamp with time zone DEFAULT clock_timestamp() NOT NULL,
    completed_at timestamp with time zone,
    status public.job_status DEFAULT 'pending'::public.job_status NOT NULL,
    specification jsonb NOT NULL,
    upstream_manifest integer,
    completed_by integer,
    attempted_by integer
);

CREATE SEQUENCE job_id_seq
    AS integer
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;

ALTER SEQUENCE job_id_seq OWNED BY job.id;
ALTER TABLE ONLY job ALTER COLUMN id SET DEFAULT nextval('job_id_seq'::regclass);
ALTER TABLE ONLY job
    ADD CONSTRAINT job_pkey PRIMARY KEY (id);
ALTER TABLE ONLY job
    ADD CONSTRAINT job_attempted_by_fkey FOREIGN KEY (attempted_by) REFERENCES runner(id);
ALTER TABLE ONLY job
    ADD CONSTRAINT job_completed_by_fkey FOREIGN KEY (completed_by) REFERENCES runner(id);

播种表

INSERT INTO runner (alias, hostname) VALUES ('test01', 'test-host');

INSERT INTO job (specification) VALUES ('{"spec": true}'::jsonb);

request_jobs函数

-- 1. Get runner ID by alias
-- 2. Update runner last seen time (trigger?)
-- 3. Select 'pending' total_jobs limit total_jobs
-- 4. Set the foreign key of the runner who's attempting this job
CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
RETURNS TABLE(job_id integer, specification json) AS $$
DECLARE
    runner_id integer;
    selected_jobs job[];
BEGIN
    -- Start a transaction
    BEGIN
        -- Retrieve the runner_id based on the runner_alias
        SELECT id INTO runner_id FROM runner WHERE alias = runner_alias;

        -- Select the first total_jobs pending jobs to assign them to the runner
        SELECT j.id as job_id, j.specification
        FROM job AS j
        WHERE status = 'pending'
        LIMIT total_jobs
        FOR UPDATE SKIP LOCKED
        INTO selected_jobs;

        -- Update the selected jobs to set their status to 'running' and assign them to the runner
        UPDATE job
        SET attempted_by = runner_id,
            status = 'running'
        WHERE id IN (SELECT job_id FROM selected_jobs);

        -- Return the selected jobs
        RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
    END;

END;
$$ LANGUAGE plpgsql;

我试过用FOR循环(RETURN NEXT)重写它,使用RETURNS SETOF作业,以及各种其他调整,但到目前为止,我还没有能够破解它。

ndh0cuux

ndh0cuux1#

最后,我通过使用临时表的方法来修复它:

CREATE OR REPLACE FUNCTION public.request_jobs(runner_alias text, total_jobs integer)
RETURNS TABLE(job_id integer, specification json) AS $$
DECLARE
    runner_id integer;
BEGIN
    -- Start a transaction
    BEGIN
        -- Retrieve the runner_id based on the runner_alias
        SELECT id INTO runner_id FROM api.runner WHERE alias = runner_alias;

        -- Create a temporary table to insert the data into
        
        CREATE TEMPORARY TABLE selected_jobs (job_id integer, specification json)
        ON COMMIT DROP;
        
        -- Select the first total_jobs pending jobs to assign them to the runner
        INSERT INTO selected_jobs(job_id, specification)
        SELECT j.id, j.specification
        FROM api.job AS j
        WHERE status = 'pending'
        LIMIT total_jobs
        FOR UPDATE SKIP LOCKED;

        -- Update the selected jobs to set their status to 'running' and assign them to the runner
        UPDATE api.job j
        SET attempted_by = runner_id,
            status = 'running'
        WHERE j.id IN (SELECT s.job_id FROM selected_jobs s);

        -- Return the selected jobs
        RETURN QUERY SELECT s.job_id, s.specification FROM selected_jobs as s;
    END;

END;
$$ LANGUAGE plpgsql;

相关问题