postgresql 使用dbt时如何手动创建表或使用postgres分区?

v7pvogib  于 2023-02-22  发布在  PostgreSQL
关注(0)|答案(2)|浏览(273)

我想使用DBT将数据插入到分区表中,但发现不支持DBT Postgres分区。
通过另一种方式,我在pre_hook中创建了表和分区,但是在运行dbt时得到了错误“relation 'download_counts_p' already exists”
有什么建议吗?这是我的SQL和pre_hook配置

{{ config(
    materialized = 'table',
    indexes = [ ],
    pre_hook=[
        'CREATE TABLE IF NOT EXISTS "download_counts_p" (
                                              "channel_id" int8 NOT NULL,
                                              "product_id" int8 NOT NULL,
                                              "country_code" text NOT NULL,
                                              "year" int2  NULL,
                                              "month" int2 NOT NULL,
                                              "count" int8 NOT NULL,
                                              "count" int8 NOT NULL,
                                              "months" int8 NOT NULL
                                         ) partition by list(country_code)',
        "DO $$
    Declare unique_country_code varchar;
    BEGIN
        FOR unique_country_code IN
            SELECT country_code as unique_country_code FROM download_counts group by country_code

            LOOP
                EXECUTE  format('create table IF NOT EXISTS download_counts_p_%s partition of download_counts_p for values in (''%s'')', upper(unique_country_code), unique_country_code);
            END LOOP;
    END; $$;"]
)}}

select 1

ffscu2ro

ffscu2ro1#

这里发生了一些不同的事情。

  1. dbt的postgres适配器does not currently support是分区表的配置,这是正确的。这里的解决方案是创建一个new materialization来插入所需的额外DDL。您不希望在dbt中手动写入DDL--这被认为是一个大的反模式。您可能还希望在dbt-core(postgres适配器代码所在的位置)中打开一个问题来进行特性请求
    1.你的FOR ... LOOP ... END LOOP钩子应该使用jinja循环编写成它自己的模型,你可以使用run_query宏将数据返回到jinja上下文,在该页的底部有一个例子,它将查询结果提取到jinja上下文,然后使用{% for payment_method in results_list %}循环
osh3o9ms

osh3o9ms2#

在DBT中,DDL表的创建是自动完成的,运行钩子前或钩子后没有帮助,因为PostgreSQL不允许将表更改为具有分区子句。
代替pre_hook块--每个模块都需要这样做--可以修改来自DBT postgres的create table语句来处理表分区创建。这个解决方案支持范围和列表类型。
作为PostgreSQL DDL语句的示例

CREATE TABLE measurement (
                city_id         int not null,
                logdate         date not null,
                peaktemp        int,
                unitsales       int
            ) PARTITION BY RANGE (logdate);

修改此DDL以包含PARTITION BY子句之后,可以创建并附加分区
应用自适应
1-在你的项目中找到postgres dbt适配器,通常路径如下:

venv/lib/python3.10/site-packages/dbt/include/postgres/macros/adapters.sql

2-修改postgres__create_table_as宏内容如下:

{% macro postgres__create_table_as(temporary, relation, sql) -%}
{%- set unlogged = config.get(‘unlogged’, default=false) -%}
{%- set sql_header = config.get(‘sql_header’, none) -%}

{%- set fields_string = config.get(‘fields_string’, none) -%}

{%- set partition_type = config.get(‘partition_type’, none) -%}
{%- set partition_column = config.get(‘partition_column’, none) -%}

{%- set partition_range_start_end = config.get(‘partition_range_start_end’, none) -%}
{%- set partition_list_values = config.get(‘partition_list_values’, none) -%}

{%- set is_partition = partition_type is not none -%}

{{ sql_header if sql_header is not none }}

create {% if temporary -%}
temporary
{%- elif unlogged -%}
unlogged
{%- endif %} table {{ relation }}
as (
{{ sql }}
{% if is_partition and not temporary %}
limit 0
{% endif %}
);

{% if is_partition and not temporary %}

{%- set relation_str=relation -%}
{%- set relation_str -%}
{{ relation_str|string|replace(‘“‘, ‘’) }}
{%- endset -%}

{%- set relation_str2-%}{{relation_str}}_tmp4part{%- endset -%}
{%- set relation_str2-%}”{{relation_str2|replace(‘.’,’”.”’)}}”{%- endset -%}

CREATE TABLE {{relation_str2}} ( like {{ relation }} including all) PARTITION BY {{ partition_type }} ({{ partition_column }});
DROP TABLE {{ relation }};
CREATE TABLE {{ relation }} ( like {{relation_str2}} including all ) PARTITION BY {{ partition_type }} ({{ partition_column }});
DROP TABLE {{relation_str2}};

{% if partition_type|lower == ‘range’ %}
{% for rng in partition_range_start_end %}
{% set start = rng[0] %}
{% set end = rng[1] %}
{%- set table4part-%}{{relation_str}}_{{loop.index}} {%- endset -%}
{%- set table4part-%}”{{table4part|replace(‘.’,’”.”’)}}” {%- endset -%}
CREATE TABLE {{table4part}} PARTITION OF {{ relation }} FOR VALUES FROM (‘{{start}}’) TO (‘{{end}}’);
{% endfor %}
{% elif partition_type|lower == ‘list’ %}
{% for val in partition_list_values.split(‘,’) %}
{%- set table4part-%}{{relation_str}}_{{loop.index}} {%- endset -%}
{%- set table4part-%}”{{table4part|replace(‘.’,’”.”’)}}” {%- endset -%}
CREATE TABLE {{table4part}} PARTITION OF {{ relation }} FOR VALUES IN ({{val}});
{% endfor %}
{%- endif %}

{# INSERT DATA #}
insert into {{ relation }} (
{{ sql }}
);
{% endif %}

{%- endmacro %}

3-选择要创建的模块中的分区列表,如下所示:

{%- set partition_type = ‘LIST’ -%}
            {%- set partition_column = ‘level’ -%}
            {%- set partition_list_values=’1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51'-%}

            {{
            config(
            materialized=’incremental’,
            partition_type=partition_type,
            partition_column=partition_column,
            partition_list_values=partition_list_values,
            unique_key=[‘col1’,’col2’]
            )
            }}

参考和更多详情https://medium.com/@fmohammad_91999/postgresql-table-partitioning-in-dbt-6d6ed82e90ca

相关问题