如何在python中使用雪花连接创建sparkDataframe?

z4bn682m  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(481)

我对spark和python是新手,我有一个sql,它存储在python的一个变量中,我们使用snowflake数据库。如何使用带有雪花连接的sql创建spark数据?

import sf_connectivity (we have a code for establishing connection with Snowflake database)
    emp = 'Select * From Employee'
    snowflake_connection = sf_connectivity.collector() (It is a method to establish snowflake conenction)
    requirement 1: Create Spark Dataframe (sf_df) using 'emp' and 'snowflake_connection '
    requirement 2: sf_df.createOrReplaceTempView(Temp_Employee)

它需要什么包或库?我怎样才能做到这一点?

mwyxok5s

mwyxok5s1#

帮助我解决这个问题的文档如下:https://docs.databricks.com/data/data-sources/snowflake.html
我花了一段时间才想出如何让它工作!在问了很多问题之后,我让我公司的it部门配置了一个带有私钥/公钥身份验证的snowflake用户帐户,他们将该id配置为可以在我们公司的databricks帐户中访问。
设置好之后,下面的代码是一个示例,说明如何将sql命令作为变量传递给spark,并让spark将其转换为Dataframe。

optionsSource = dict(sfUrl="mycompany.east-us-2.azure.snowflakecomputing.com", # Snowflake Account Name
                          sfUser="my_service_acct",
                          pem_private_key=dbutils.secrets.get("my_scope", "my_secret"),
                   sfDatabase="mydatabase", # Snowflake Database
                   sfSchema="myschema", # Snowflake Schema
                   sfWarehouse="mywarehouse",
                   sfRole="myrole"
                        )   

    sqlcmd = '''
    select current_date;
    '''

    df = spark.read.format("snowflake").options(**optionsSource).option("query", sqlcmd).load()
    display(df)
0sgqnhkj

0sgqnhkj2#

对于公钥/私钥,您需要生成证书https://community.snowflake.com/s/article/how-to-connect-snowflake-with-spark-connector-using-public-private-key 然后你可以使用下面的代码。

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars", "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.repl.local.jars",
            "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

with open("<path/to/>/rsa_key.p8", "rb") as key_file:
    p_key = serialization.load_pem_private_key(
        key_file.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")

sfOptions = {
    "sfURL": "<URL>",
    "sfAccount": "<ACCOUNTNAME>",
    "sfUser": "<USER_NAME",
    "pem_private_key": pkb,
    # "sfPassword": "xxxxxxxxx",
    "sfDatabase": "<DBNAME>",
    "sfSchema": "<SCHEMA_NAME>",
    "sfWarehouse": "<WH_NAME>",
    "sfRole": "<ROLENAME>",
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "<TABLENAME>") \
    .load()

df.show()

相关问题