PostgreSQL DBLink where语句使用本地数据库中的变量

ruarlubt  于 2023-06-22  发布在  PostgreSQL
关注(0)|答案(1)|浏览(280)

我有两个PostgreSQL数据库(一个源数据库和一个目标数据库),其中包含多个表。我设法使用DBLink将数据从源传输到目标(复制数据或复制数据子集)。
我的问题是如何使语句更优雅,因为它是动态生成的。例如,目前我有以下声明:

  1. DO $$
  2. BEGIN
  3. DECLARE
  4. upsert_value Timestamp;
  5. BEGIN
  6. timestamp_value := ISNULL((SELECT "timestamp" FROM system.lastupdatetable_stg WHERE "group_id" = 54 AND "tenant_id" = 23 AND "table_id" = 3327 LIMIT 1), to_timestamp(0));
  7. INSERT INTO public."AccessToDivisions"
  8. ("CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId")
  9. SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId"
  10. FROM dblink('<connection>',
  11. 'SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId" FROM public."AccessToDivisions" t
  12. WHERE 1=1
  13. AND "ModifiedAt" >' + timestamp_value
  14. ) AS rt("CreatedAt" timestamp without time zone, "Default" boolean, "Division" integer, "Employee" character(36), "GCRecord" integer, "ModifiedAt" timestamp without time zone, "OID" integer, "OptimisticLockField" integer, "TenantId" character(36))
  15. ON CONFLICT("TenantId", "OID")
  16. DO UPDATE SET "CreatedAt" = excluded."CreatedAt", "Default" = excluded."Default", "Division" = excluded."Division", "Employee" = excluded."Employee", "GCRecord" = excluded."GCRecord", "ModifiedAt" = excluded."ModifiedAt", "OptimisticLockField" = excluded."OptimisticLockField";
  17. INSERT INTO system.lastupdatetable_stg ("group_id", "tenant_id", "table_id", "snapshot", "timestamp") VALUES (54, 23, 3327, false, (SELECT MAX("ModifiedAt") FROM public."AccessToDivisions")) ON CONFLICT ("group_id", "tenant_id", "table_id") DO UPDATE SET "timestamp" = excluded."timestamp";
  18. END;
  19. END $$;

system.lastupdatetable_stg驻留在目标数据库中。是否可以不声明变量upsert_value,而是在dblink语句中进行调用,如下所示?

  1. DO $$
  2. BEGIN
  3. INSERT INTO public."AccessToDivisions"
  4. ("CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId")
  5. SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId"
  6. FROM dblink('<connection>',
  7. 'SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId" FROM public."AccessToDivisions" t
  8. WHERE 1=1
  9. AND "ModifiedAt" > ISNULL((SELECT "timestamp" FROM system.lastupdatetable_stg WHERE "group_id" = 54 AND "tenant_id" = 23 AND "table_id" = 3327 LIMIT 1), to_timestamp(0))'
  10. ) AS rt("CreatedAt" timestamp without time zone, "Default" boolean, "Division" integer, "Employee" character(36), "GCRecord" integer, "ModifiedAt" timestamp without time zone, "OID" integer, "OptimisticLockField" integer, "TenantId" character(36))
  11. ON CONFLICT("TenantId", "OID")
  12. DO UPDATE SET "CreatedAt" = excluded."CreatedAt", "Default" = excluded."Default", "Division" = excluded."Division", "Employee" = excluded."Employee", "GCRecord" = excluded."GCRecord", "ModifiedAt" = excluded."ModifiedAt", "OptimisticLockField" = excluded."OptimisticLockField";
  13. INSERT INTO system.lastupdatetable_stg ("group_id", "tenant_id", "table_id", "snapshot", "timestamp") VALUES (54, 23, 3327, false, (SELECT MAX("ModifiedAt") FROM public."AccessToDivisions")) ON CONFLICT ("group_id", "tenant_id", "table_id") DO UPDATE SET "timestamp" = excluded."timestamp";
  14. END $$;

第二个查询的问题是,在查找源数据库时,找不到system.lastupdatetable_stg
有没有办法让select语句执行到destination,或者有办法在dbink内部传递参数,而不是使用引号进行简单的字符串插值?

fcwjkofz

fcwjkofz1#

似乎没有办法在dblink内部执行命令,因为dblink在源上执行时会从目的地获得一些东西。我认为@Frank Heiken注解部分解决了这个问题,在源数据库上使用外来表,但同时增加了复杂性,如果你必须处理数百个数据库。
我能够使用格式和%L参数作为引号插值的替代,使SQL语句变得更漂亮,这将简化更高级别的代码(C#),同时生成以下代码:

  1. DO $$
  2. BEGIN
  3. DECLARE
  4. upsert_value Timestamp;
  5. BEGIN
  6. timestamp_value := ISNULL((SELECT "timestamp" FROM system.lastupdatetable_stg WHERE "group_id" = 54 AND "tenant_id" = 23 AND "table_id" = 3327 LIMIT 1), to_timestamp(0));
  7. INSERT INTO public."AccessToDivisions"
  8. ("CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId")
  9. SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId"
  10. FROM dblink('<connection>',
  11. format('SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId" FROM public."AccessToDivisions" t
  12. WHERE 1=1
  13. AND "ModifiedAt" > %L', timestamp_value)
  14. ) AS rt("CreatedAt" timestamp without time zone, "Default" boolean, "Division" integer, "Employee" character(36), "GCRecord" integer, "ModifiedAt" timestamp without time zone, "OID" integer, "OptimisticLockField" integer, "TenantId" character(36))
  15. ON CONFLICT("TenantId", "OID")
  16. DO UPDATE SET "CreatedAt" = excluded."CreatedAt", "Default" = excluded."Default", "Division" = excluded."Division", "Employee" = excluded."Employee", "GCRecord" = excluded."GCRecord", "ModifiedAt" = excluded."ModifiedAt", "OptimisticLockField" = excluded."OptimisticLockField";
  17. INSERT INTO system.lastupdatetable_stg ("group_id", "tenant_id", "table_id", "snapshot", "timestamp") VALUES (54, 23, 3327, false, (SELECT MAX("ModifiedAt") FROM public."AccessToDivisions")) ON CONFLICT ("group_id", "tenant_id", "table_id") DO UPDATE SET "timestamp" = excluded."timestamp";
  18. END;
  19. END $$;

所以,最后似乎格式在DBLink中也起作用,作为例外,这种方式在C#端可以是:
对于满载:

  1. FROM dblink('<connection>',
  2. 'SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId" FROM public."AccessToDivisions" t
  3. WHERE 1=1

对于增量载荷:

  1. FROM dblink('<connection>',
  2. format('SELECT "CreatedAt", "Default", "Division", "Employee", "GCRecord", "ModifiedAt", "OID", "OptimisticLockField", "TenantId" FROM public."AccessToDivisions" t
  3. WHERE 1=1
  4. AND "ModifiedAt" > %L', timestamp_value)
展开查看全部

相关问题