从使用Python的Snowflake存储过程的输出创建表

3zwjbxry  于 2023-10-21  发布在  Python
关注(0)|答案(1)|浏览(133)

我已经编写了一个透视表的存储过程(作为Snowflake中缺少动态透视表的解决方案)。

CREATE OR REPLACE PROCEDURE pivot_table(tableName VARCHAR, index VARCHAR, columns VARCHAR, vals VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'pivot_the_table'
AS
$$
def pivot_the_table(session, table_name, index, columns, vals):
    df = session.table(table_name).to_pandas()
    df.columns = df.columns.str.lower()
    SEP = ","
    index = index.split(SEP) if SEP in index else index
    columns = columns.split(SEP) if SEP in columns else columns
    vals = vals.split(SEP) if SEP in vals else vals
    pvt = df.pivot_table(index=index, values=vals, columns=columns, aggfunc="sum").reset_index()
    return session.create_dataframe(pvt)
$$;

它工作正常,但我不知道如何将输出保存为实际的Snowflake表。或者有没有一种方法可以用 snowflake 函数来实现这一点?
在一个完美的世界里,我可以做这样的事情:

CREATE OR REPLACE TABLE some_table AS
SELECT
   *
FROM (
    pivot_table(table, "month", "year", "sales")
) AS pvt
LEFT JOIN some_other_table AS oth ON pvt.month = oth.month;

我知道存储过程也可以使用SQL作为语言,但如果可能的话,我宁愿使用Python。

oknwwptz

oknwwptz1#

看来我想明白了。这将把表保存到活动数据库和模式中:

CREATE OR REPLACE PROCEDURE pivot_table(tableName VARCHAR, index VARCHAR, columns VARCHAR, vals VARCHAR, output_table_name VARCHAR)
RETURNS varchar
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'pivot_the_table'
AS
$$
def pivot_the_table(session, table_name, index, columns, vals, output_table_name):
    df = session.table(table_name).to_pandas()
    df.columns = df.columns.str.lower()
    SEP = ","
    index = index.split(SEP) if SEP in index else index
    columns = columns.split(SEP) if SEP in columns else columns
    vals = vals.split(SEP) if SEP in vals else vals
    pvt = df.pivot_table(index=index, values=vals, columns=columns, aggfunc="sum").reset_index()
    session.create_dataframe(pvt).write.mode("overwrite").save_as_table(output_table_name.upper(), table_type="")
$$;

相关问题