postgresql 同时插入和更新记录

krcsximq  于 2023-10-18  发布在  PostgreSQL
关注(0)|答案(1)|浏览(154)

下面是一个Postgres表:

  1. create table test (
  2. id serial,
  3. contract varchar,
  4. amount int,
  5. aggregated int,
  6. is_aggregate int
  7. );
  1. insert into test (contract, amount, aggregated)
  2. values ('abc', 100, 0),
  3. ('abc', 200, 0),
  4. ('xyz', 50, 0),
  5. ('xyz', 60, 0);

| ID|合同|量|聚合|是聚集的|
| --|--|--|--|--|
| 1 |ABC| 100 | 0 ||
| 2 |ABC| 200 | 0 ||
| 3 |xyz| 50 | 0 ||
| 4 |xyz| 60 | 0 ||
我试图写一个SQL语句,在每个合约的基础上插入一个聚合行,因此结果应该是这样的:
| ID|合同|量|聚合|是聚集的|
| --|--|--|--|--|
| 1 |ABC| 100 | 1 ||
| 2 |ABC| 200 | 1 ||
| 3 |xyz| 50 | 1 ||
| 4 |xyz| 60 | 1 ||
| 5 |ABC| 300 || 1 |
| 6 |xyz| 110 || 1 |
由于新行不断从另一个数据源添加,因此插入聚合行并将已聚合的行中的“aggregated”设置为1需要一次性完成,以避免并发问题。
我试过几种方法,但我只走了这么远:

  1. INSERT INTO test (contract, amount, aggregated, is_aggregate)
  2. SELECT contract,
  3. SUM(amount) AS sum_amount,
  4. 1,
  5. 1
  6. FROM test
  7. WHERE aggregated IS NULL
  8. OR aggregated = 0
  9. GROUP BY contract
  10. HAVING COUNT(*) > 1;

这将插入聚合的行,但我不知道如何包含一个子句,以便为已聚合的行将“aggregated”更新为1。
编辑日期:
经过几次“再压缩”后,数据看起来像这样:
| ID|合同|量|聚合|是聚集的|
| --|--|--|--|--|
| 1 |ABC| 100 | 1 ||
| 2 |ABC| 200 | 1 ||
| 3 |xyz| 50 | 1 ||
| 4 |xyz| 60 | 1 ||
| 5 |ABC| 300 | 1 | 1 |
| 6 |xyz| 110 | 1 | 1 |
| 7 |ABC| 20 | 1 ||
| 8 |ABC| 30 | 1 ||
| 9 |xyz| 70 | 1 ||
| 10 |xyz| 80 | 1 ||
| 11 |ABC| 50 | 1 | 1 |
| 12 |xyz| 150 | 1 | 1 |
| 13 |ABC| 350 || 1 |
| 14 |xyz| 260 || 1 |
其中,ID 11和12再次是第一次运行的聚合(类似于之前的ID 5和6),并且ID 13和14是先前聚合运行的聚合(即,即分别为ID 5和11以及ID 6和12)。最后一次运行的聚合可以通过它们的'aggregated' = NULL来确定
注意:还有额外的“时间”列,通过这些列可以确定每行的聚合级别,以便在以后的业务逻辑中使用。为简洁起见,这里省略了这些列。
有人能帮帮我吗?先谢了。

64jmpszr

64jmpszr1#

将数据点和聚合数据点放在同一个表中并不是一个理想的设计模式,最好将它们分开放在两个表中。

一张table

如果你仍然只想使用一个表,那么你可以使用下面的代码。

created_at过滤

添加名为created_attimestamp列,默认值为NOW()。请注意,NOW()的值在Postgres中的事务中不会更改,并且它还具有读取一致性(快照隔离),这可以防止在此事务开始后由其他进程插入/修改的行包含在此事务中。

  1. -- Start a transaction
  2. BEGIN;
  3. -- Aggregate
  4. INSERT INTO test (contract, amount, aggregated, is_aggregate)
  5. SELECT contract,
  6. SUM(amount),
  7. NULL,
  8. 1
  9. FROM test
  10. WHERE aggregated IS NULL
  11. OR aggregated = 0
  12. GROUP BY contract
  13. HAVING COUNT(*) > 1;
  14. -- Mark aggregated rows as aggregated
  15. UPDATE test
  16. SET aggregated = 1
  17. WHERE id IN (
  18. SELECT id
  19. FROM test
  20. WHERE (aggregated IS NULL
  21. OR aggregated = 0)
  22. AND created_at <> NOW()
  23. GROUP BY contract
  24. HAVING COUNT(*) > 1
  25. );
  26. -- Commit the transaction
  27. COMMIT;

使用显式行锁定

添加名为is_lockedboolean列,默认值为NULL。这是因为聚合条件在聚合前后返回不同的行。另一种方法是使用插入行的时间戳在最终更新中过滤掉它们,但我不确定哪些列可用,因为表定义是部分的。
此代码不是并行安全的-在任何给定时间只运行一个聚合进程。

  1. -- Start a transaction
  2. BEGIN;
  3. -- Lock the rows you want to aggregate
  4. UPDATE test
  5. SET is_locked = TRUE
  6. WHERE id IN (
  7. SELECT id
  8. FROM test
  9. WHERE is_locked IS NULL
  10. AND (aggregated IS NULL OR aggregated = 0)
  11. AND contract IN (
  12. SELECT contract
  13. FROM test
  14. WHERE is_locked IS NULL
  15. AND (aggregated IS NULL OR aggregated = 0)
  16. GROUP BY contract
  17. HAVING COUNT(*) > 1
  18. )
  19. );
  20. -- Insert the data from the locked rows
  21. INSERT INTO test (contract, amount, aggregated, is_aggregate)
  22. SELECT contract,
  23. SUM(amount) AS sum_amount,
  24. NULL,
  25. 1
  26. FROM test
  27. WHERE is_locked = TRUE
  28. GROUP BY contract;
  29. -- Update the aggregated column to set it to 1 for the locked rows
  30. -- and remove the locks
  31. UPDATE test
  32. SET aggregated = 1,
  33. is_locked = NULL
  34. WHERE is_locked = TRUE;
  35. -- Commit the transaction
  36. COMMIT;

两张表

如果你想使用一个单独的聚合表,请参阅下文。
向测试表中添加一个时间戳,并将该时间戳用于聚合目的。

以下是概念性解决方案提案。确切的语法取决于您使用的数据库引擎。

修改后的测试表定义:

  1. create table test (
  2. id serial,
  3. contract varchar,
  4. amount int,
  5. created_at timestamp DEFAULT CURRENT_TIMESTAMP
  6. );

新的聚合表:

  1. create table test_agg (
  2. contract varchar,
  3. amount int,
  4. aggregated_to timestamp
  5. );
  6. CREATE UNIQUE INDEX test_agg_contract_uq ON test_agg (contract);

修改后的SQL语句,如果聚合表中没有行,则删除该行,或者删除现有行。

  1. INSERT INTO test_agg (contract, amount, aggregated_to)
  2. SELECT t.contract,
  3. SUM(t.amount),
  4. MAX(t.created_at)
  5. FROM test t,
  6. LEFT OUTER JOIN test_agg ta ON ta.contract = t.contract
  7. WHERE
  8. ta.aggregated_to IS NULL
  9. OR t.created_at > ta.aggregated_to
  10. GROUP BY t.contract
  11. ON DUPLICATE KEY UPDATE
  12. amount = amount + VALUES(amount),
  13. aggregated_to = VALUES(aggregated_to);
展开查看全部

相关问题