pyspark 使用目标方言从SQL查询创建(Py)Spark Dataframe

8hhllhi2  于 2022-11-21  发布在  Spark
关注(0)|答案(3)|浏览(130)

赶紧的,我急道:从T-SQL(SQL Server)中的复杂查询和/或SQL Server存储过程的输出中创建Spark Dataframe 。
据我所知,Spark不允许以底层数据源的方言执行查询。是的,有a way来获取低级对象并执行存储过程,但在这种方式下,我在输出中没有Spark DF。
因此,我想以经典的pyodbc方式执行查询,获得结果,然后使用提供数据和模式的函数SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)构建Spark数据框架。(一对列表 (列名、数据类型))。请按照一个工作示例从SQL Server的本地示例(生成和)提取示例数据:

import pyodbc

connection_string = "Driver={SQL Server};Server=LOCALHOST;Database=master;Trusted_Connection=yes;"
db_connection = pyodbc.connect(connection_string)

sql_query = """
SET NOCOUNT ON
DECLARE @TBL_TEST AS TABLE (
    column_1 INT NOT NULL PRIMARY KEY CLUSTERED IDENTITY(1, 1),
    column_2 VARCHAR(10) NOT NULL,
    column_3 VARCHAR(20) NULL,
    column_4 INT NOT NULL
)

INSERT INTO @TBL_TEST (column_2, column_3, column_4)
VALUES
('test1_col2', 'test1_col3', 100),
('test2_col2', 'test2_col3', 200),
('test3_col2', NULL, 300)

SET NOCOUNT OFF
SELECT t.* FROM @TBL_TEST AS t
"""

cursor = db_connection.cursor()
rows = cursor.execute(sql_query).fetchall()
cursor.close()
db_connection.close()

print(rows)

如何从返回的游标中提取模式并获得一个 schema 对象以给予给createDataFrame()函数?
记住,我的目标是在题目上,所以其他方式也是欢迎的!
提前感谢!

9udxz4iz

9udxz4iz1#

如果使用pyodbc,catalyst优化器生成的java字节码只作为一个节点(executor)运行,而不是整个集群。对于更大的数据集,这会妨碍集群的充分利用和性能问题。

最好使用JDBC的spark驱动程序,微软有一个。
https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16
将复杂的T-SQL创建为视图,然后读取它们。这就是Spark为阅读文件所做的。使用JDBC驱动程序(spark),如果需要,它将允许您通过更改分区方法来并行读取。
为正确版本的spark安装Marven库。

我使用的是Spark版本〉3.1。
我有一个冒险作品数据库,其中有一个视图叫v。

#
#  Set connection properties
#

server_name = "jdbc:sqlserver://svr4tips2030.database.windows.net"
database_name = "dbs4advwrks"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "dbo.vDMPrep"
user_name = "enter your user here"
password = "enter your password here"

使用JDBC驱动程序进行典型的spark.read()调用。

df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", user_name) \
        .option("password", password).load()

display(df)

以下是显示 Dataframe 的结果。

数据框是严格类型化的吗?答案是肯定的,因为它从SQL Server获取字段信息。
x1c4d 1x指令集
最后但并非最不重要的一点是,视图是否复杂?下图显示了8个表被联接和聚合以获得视图的最终结果。

总之,使用数据库中的视图为Spark预编译数据集。使用Microsoft的JDBC驱动程序通过dataframe读取和写入SQL Server。
至于存储过程,有一种方法可以使用驱动程序来执行非查询。我将不得不寻找代码。请继续关注更新或第2部分。

apeeds0o

apeeds0o2#

这是答案的第二部分。没有好的方法将存储过程调用的结果作为 Dataframe 返回。
以下是MSFT github站点上该驱动程序的链接,声明不支持存储过程。
https://github.com/microsoft/sql-spark-connector/issues/21
这是一个黑客的工作。
在我的例子中,我的SP要做一些工作,并将其保存到一个临时表中。然后使用上述技术读取该表。
下面的代码删除表(如果存在),然后重新加载。

-- 
-- Sample Call
-- 
CREATE PROCEDURE dbo.StackOverFlowTest
AS
BEGIN
    DROP TABLE IF EXISTS stage.DimSalesTerritory;
    SELECT * INTO stage.DimSalesTerritory FROM dbo.DimSalesTerritory
END

下面是进入底层JAVA驱动程序管理器的代码。它有一个调用SP的属性。

#
#  Grab the low level driver manager, exec sp
#

driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
connection = driver_manager.getConnection(url, user_name, password)
connection.prepareCall("EXEC dbo.StackOverFlowTest").execute()
connection.close()

使用spark.read()从SP填充的新表中检索数据。

ki1q1bka

ki1q1bka3#

我希望这适合你的用例。同样,这将不会伸缩,因为它运行在控制节点(执行器)。如果你有一个5节点集群,这将只运行在1个节点。
也就是说,我们可以让spark推断数据类型。如果你看pyodbc文档,你必须安装一个本地ODBC驱动程序。这对工作站上的anaconda是好的,但对spark集群是坏的。相反,用户pymssql模块是自包含的本机代码。使用集群库的PyPi部分安装。
https://learn.microsoft.com/en-us/sql/connect/python/pymssql/python-sql-driver-pymssql?view=sql-server-ver16
现在我们有了一个驱动程序,让我们编写一个模块,它将从SELECT语句或存储过程调用EXEC返回一个 Dataframe 。

#
#  Create function to call tsql + return df
#

# use module
import pymssql  

# define function
def call_tsql(info):
  
  # make connection
  conn = pymssql.connect(server=info[0], user=info[1], password=info[2], database=info[3])  
  
  # open cursor
  cursor = conn.cursor()  
  cursor.execute(info[4])
  
  # grab column data (name, type, ...)
  desc = cursor.description
  
  # grab data as list of tuples
  dat1 = cursor.fetchall()
  
  # close cursor
  conn.commit()
  conn.close()
  
  # extract column names
  col1 = list(map(lambda x: x[0], desc))
  
  # let spark infer data types
  df1 = spark.createDataFrame(data=dat1, schema=col1)
  
  # return dataframe
  return df1

此代码将只支持一个结果集。如果需要MARS,请修改多个活动结果集。
有大量的注解。简而言之,连接和TSQL的信息元组作为参数传入,并返回 Dataframe 。
是的,cursor.description中有数据类型,但它们是编码的。我没有找到合适的Map。由于您不处理大型数据,请推断架构。否则,请为架构传递DDL语句而不是列标题。

#
# Make call using SELECT statement
#

# tuple of info (server, user, pwd, database, query)
info = ('svr4tips2030.database.windows.net', '<your user>', '<your pwd>', 'dbs4advwrks', 'select * from dbo.vDMPrep')

# get data frame
df2 = call_tsql(info)

上图显示了推断的类型,下图显示了数据。

我创建了一个非常简单的存储过程,它只从视图中进行选择。

CREATE PROCEDURE [dbo].[StackOverFlowTest]
AS
BEGIN
    SELECT * FROM [dbo].[vDMPrep]
END
GO

如果我们更改调用以执行此存储过程,则会得到相同的答案。

#
# Make call using EXEC statement
#

# tuple of info (server, user, pwd, database, query)
info = ('svr4tips2030.database.windows.net', '<your user>', '<your pwd>', 'dbs4advwrks', 'exec [dbo].[StackOverFlowTest]')

# get data frame
df2 = call_tsql(info)

随附的是一个示例SPARKDDL语句。请参见JSON文档的file_schema部分,它作为参数传递给笔记本。

#
# Table 1 - dim.account
#

# Set parameters for notebook
parms = {
"datalake_path": "/mnt/datalake/bronze/",
"datafile_path": "/dim/account/dim-account-20200905T101530.csv",
"debug_flag": "false",
"partition_count": "2",
"file_schema": "AccountKey INT, ParentAccountKey INT, AccountCodeAlternateKey INT, ParentAccountCodeAlternateKey INT, AccountDescription STRING, AccountType STRING, Operator STRING, CustomMembers STRING, ValueType STRING, CustomMemberOptions STRING"
}

# Run notebook with selections
ret = dbutils.notebook.run("./nb-full-load-delta-table", 60, parms)

# Show return value if any
print(ret)

概括地说,如果您遇到性能问题,请创建一个单节点集群并在该节点上扩展计算。不可以,因为pymssql模块不会使用多个节点。

相关问题