Go语言 需要帮助实现从SQL数据库中挑选作业的调度程序

z9zf31ra  于 2023-05-11  发布在  Go
关注(0)|答案(3)|浏览(95)

我有一个SQL表,其中充满了第一天的挂单。在第二天的特定时间,我必须从DB中获取“PENDING”订单,并通过调用外部API来处理每个订单。这就是我的代码流看起来的样子:
SELECT * FROM orders where status = 'PENDING' LIMIT 200
现在,我将为每个订单调用外部API,该API将返回每个订单的成功或失败,然后我将在DB中更新订单状态。
UPDATE orders SET status = 'COMPLETED' WHERE id = ANY(<success list>)
UPDATE orders SET status = 'FAILED' WHERE id = ANY(<failure list>)
以上流程将继续运行多次,直到select查询返回0行。为了避免内存问题和外部API的吞吐能力,我在查询中设置了一个LIMIT。
现在,上面的流程有几个问题:
1.假设我的代码执行了SELECT查询并开始处理订单。如果我的服务在这期间崩溃了怎么办?会有一些订单会通过API,并会收到通过或失败的响应。但是我错过了在DB中更新它们的状态,因此当我的服务再次启动时,它将再次选择这些订单并再次处理它们,这是我不想要的。
1.我的服务可以从多个示例运行,因此状态= 'PENDING'的相同订单可以由不同的示例选择,从而导致同一订单的双重处理。如何避免这一点?
如果有帮助,我的技术堆栈是Go和PostgreSQL。我相信以上是一些常见的问题,必须有一些标准的方法来处理它们。我愿意改变任何部分,无论是Go代码还是DB更改,其中可能包括锁或事务。我只想向哪个方向去寻找答案。任何帮助将不胜感激。

pu82cl6c

pu82cl6c1#

而不是

SELECT * FROM orders WHERE status = 'PENDING' LIMIT 200

做了

WITH cte AS (
    -- this is needed only because PostgreSQL doesn't
    -- support the LIMIT clause in UPDATE queries
    SELECT * FROM orders WHERE status = 'PENDING' LIMIT 200
)

-- use UPDATE-RETURNING to first change
-- the status and then retrieve the rows
UPDATE orders AS o SET status='PROCESSING'

-- you can use this column, together with the status column,
-- to later determine if some orders were left in limbo because
-- of a crash *after* they were sent off to the processor
, updated_at=now()

-- use the CTE result to limit the update to
-- the 200 rows selected in the WITH query above
FROM cte WHERE o.id=cte.id

-- make sure that this worker ignores rows that
-- were, in the meantime, updated by another worker
--
-- this works because the workers will re-check
-- the WHERE condition after the lock is released,
-- (see the linked answer below)
AND o.status='PENDING'

-- return the matched & updated set/subset of rows
RETURNING o.*;

https://stackoverflow.com/a/11769059/965900

xuo3flqw

xuo3flqw2#

当你调用一个外部API时,它可能会被破坏或超时等。您可能需要一种更逐行的方法(可能不流行,但在某些情况下仍然有用)。假设你想使用Postgres本身,而不是一些外部编程,这可能对你有用:
创建PL/pgSQL块以使用游标处理挂单:

DO LANGUAGE plpgsql $$
DECLARE
    c CURSOR FOR SELECT id FROM orders WHERE status = 'PENDING' LIMIT 200;
    order_id INTEGER;
BEGIN
    OPEN c;
    LOOP
        FETCH c INTO order_id;
        EXIT WHEN NOT FOUND;
        PERFORM process_order(order_id);
    END LOOP;
    CLOSE c;
END;
$$;

繁重的工作在被调用的函数内部。此示例在此处放置显式提交(即每行),这可以允许一些行将工作而其他行不工作的可能性,并且它们可以稍后被拾取/重新处理。您还可以根据您的需要以及API调用实际上有多慢/多不稳定,包括“进行中”状态的逻辑。

CREATE OR REPLACE FUNCTION process_order(order_id INTEGER)
RETURNS VOID AS $$
DECLARE
    api_result BOOLEAN;
BEGIN
    -- Start a new transaction
    BEGIN
        -- just a placeholder, how you call that api isn't known
        api_result := call_external_api(order_id);

        IF api_result THEN
            UPDATE orders SET status = 'COMPLETED' WHERE id = order_id;
        ELSE
            UPDATE orders SET status = 'FAILED' WHERE id = order_id;
        END IF;
        
        -- Commit the transaction
        COMMIT;
    EXCEPTION
        WHEN OTHERS THEN
            -- Rollback the transaction in case of an error
            ROLLBACK;
    END;
END;
$$ LANGUAGE plpgsql;

或者,如果不需要这种程度的偏执,您可以在光标周围移动提交。注意行级提交会增加进程的时间(我不知道有多少)。它还可能对回滚日志大小等内容产生影响,因此可能会影响您的选择。

nqwrtyyt

nqwrtyyt3#

首先使用Pgx库访问postgresql。然后需要使用事务和行锁定。为了提高性能,你可以使用goroutine来进行并发选择和更新。
下面给出的是相同的示例代码。代码中不包含goroutine

import (
    "context"
    "fmt"

    "github.com/georgysavva/scany/pgxscan"
    "github.com/jackc/pgx/v4/pgxpool"
)

    func main() {
        pool, err := pgxpool.Connect(context.Background(), "postgres://kaushik:abcd@localhost:5432/users")
        if err != nil {
            panic(err)
        }
    
        defer func() {
            pool.Close()
        }()
    
        ctx := context.Background()
    
        tx, err := pool.Begin(ctx)
        if err != nil {
            panic(err)
        }
    
        defer func() {
            if err != nil {
                err = tx.Rollback(ctx)
                fmt.Printf("Error while rolling back transaction: %v\n", err)
                return
            }
            err = tx.Commit(ctx)
            if err != nil {
                fmt.Printf("Error while committing transaction: %v\n", err)
                return
            }
        }()
    
        _, err = tx.Exec(ctx, "LOCK TABLE employee IN ROW EXCLUSIVE MODE")
        if err != nil {
            panic(err)
        }
    
        var e Employee
        err = pgxscan.Get(ctx, tx, &e, "SELECT * FROM employee WHERE id = $1 FOR UPDATE", 1)
        if err != nil {
            panic(err)
        }
    
        if e.Status == "active" {
            fmt.Println("Employee is active, deactivating...")
            e.Status = "inactive"
    
            _, err = tx.Exec(ctx, "UPDATE employee SET status = $1 WHERE id = $2", e.Status, e.ID)
            if err != nil {
                panic(err)
            }
        }
    
    }
    
    type Employee struct {
        ID        int       `db:"id"`
        Name      string    `db:"name"`
        CreatedAt time.Time `db:"created_at"`
        UpdatedAt time.Time `db:"updated_at"`
        Status    string    `db:"status"`
    }

相关问题