python—减少在postgresql中使用多个内部连接插入记录的执行时间

4bbkushb  于 2021-07-13  发布在  Java
关注(0)|答案(0)|浏览(155)

我有这样一个用例,我必须每天在 DATAVANT_COVID_MATCH 表,同时连接其他3个表。

  1. No of records in each table:
  2. DATAVANT_COVID_MATCH = 10k
  3. COVID_PATNT_REGISTRY = 800k
  4. COVID_PATNT_REGISTRY_DEID = 800k
  5. MORTALITY_INDEX(Total count = 220 Million
  6. MORTALITY_INDEX ~= 10 Million records for each interval
  7. (Note: No of Records b/w 1980-2000 could have more than 15M records, and b/w 1940-1980 count could go down to <5M records)
  8. Partitioned Table:
  9. DATAVANT_COVID_MATCH
  10. MORTALITY_INDEX
  1. INDEX:
  2. INDEX created for every partition of MORTALITY_INDEX table
  3. example:
  4. CREATE INDEX mortality_index_1941_dod_idx
  5. ON datavant_stg_o.mortality_index_1940 USING btree
  6. (dod ASC NULLS LAST)
  7. TABLESPACE pg_default
  8. CREATE INDEX mortality_index_1941_1945_dod_idx
  9. ON datavant_stg_o.mortality_index_1941_1945 USING btree
  10. (dod ASC NULLS LAST)
  11. TABLESPACE pg_default;
  12. etc...

所以我的问题是,如何减少执行时间?如有任何意见/建议,我们将不胜感激。谢谢您
下面是当前每天运行的代码

  1. import os
  2. import logging
  3. import psycopg2
  4. import socket
  5. logging.basicConfig(level=logging.INFO)
  6. cpc_name = socket.gethostname()
  7. periods = ['1940']
  8. periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2031, 5)])
  9. if __name__ == "__main__":
  10. logging.info("Starting test process")
  11. logging.info(" cpc = {}".format(cpc_name) + '\n')
  12. for period in periods:
  13. # Do your psycopg2 connection here and get your cursor
  14. connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "USR_NAME"),
  15. password = os.environ.get("DATABASE_PASS", "DB_PASS"),
  16. host = os.environ.get("DATABASE_HOST", "db_host.internal.com"),
  17. port = 5432,
  18. dbname = os.environ.get("DATABASE_NAME", "psql_db"),
  19. options = "-c search_path=DATAVANT_O")
  20. with connection.cursor() as cursor:
  21. logging.info(str(connection.get_dsn_parameters()) + '\n')
  22. cursor.execute("SELECT version();")
  23. connection.commit()
  24. conn = cursor.fetchone()
  25. logging.info("You are connected to - " + str(conn) + '\n')
  26. cursor.execute("""
  27. INSERT INTO DATAVANT_O.DATAVANT_COVID_MATCH_{}
  28. SELECT
  29. CUST_LAST_NM,
  30. CUST_FRST_NM,
  31. CIGNA_DOB,
  32. CIGNA_ZIP,
  33. DATAVANT_DOD,
  34. DATAVANT_DOB,
  35. DEATH_VERIFICATION,
  36. DATA_SOURCE,
  37. INDIV_ENTPR_ID
  38. FROM
  39. (
  40. SELECT
  41. CR.PATNT_LAST_NM AS CUST_LAST_NM,
  42. CR.PATNT_FRST_NM AS CUST_FRST_NM,
  43. CRD.CUST_BRTH_DT AS CIGNA_DOB,
  44. CR.PATNT_POSTL_CD AS CIGNA_ZIP,
  45. MI.DOD AS DATAVANT_DOD,
  46. MI.DOB AS DATAVANT_DOB,
  47. MI.DEATH_VERIFICATION,
  48. MI.DATA_SOURCE,
  49. CRD.INDIV_ENTPR_ID,
  50. ROW_NUMBER () OVER (PARTITION BY CRD.INDIV_ENTPR_ID ORDER BY CRD.INDIV_ENTPR_ID DESC)
  51. FROM DATAVANT_O.COVID_PATNT_REGISTRY_DEID CRD
  52. INNER JOIN DATAVANT_STG_O.MORTALITY_INDEX_{} MI ON
  53. CRD.TOKEN_1 = MI.TOKEN_1 AND
  54. CRD.TOKEN_2 = MI.TOKEN_2 AND
  55. CRD.TOKEN_4 = MI.TOKEN_4
  56. INNER JOIN DATAVANT_O.COVID_PATNT_REGISTRY CR ON
  57. CR.INDIV_ENTPR_ID = CRD.INDIV_ENTPR_ID
  58. ) x
  59. WHERE
  60. ROW_NUMBER = 1;""". format(period, period)
  61. )
  62. # Commit and close your connection here
  63. connection.commit()
  64. count = cursor.rowcount
  65. if count == 0:
  66. logging.info("There are no matching records to insert in DATAVANT_COVID_MATCH table for Date: {} " . format(period) + '\n')
  67. else:
  68. logging.info("No of Record(s) Inserted in DATAVANT_COVID_MATCH table for Date: {} " . format(period) + "--> " + str(count) + '\n')
  69. connection.close()
  70. logging.info("PostgreSQL connection is closed" + "\n")

我是否应该在sql脚本中做些改进,或者通过向表中添加索引或添加更多分区来升级表,以帮助减少执行时间?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题