赶紧的,我急道:从T-SQL(SQL Server)中的复杂查询和/或SQL Server存储过程的输出中创建Spark Dataframe 。
据我所知,Spark不允许以底层数据源的方言执行查询。是的,有a way来获取低级对象并执行存储过程,但在这种方式下,我在输出中没有Spark DF。
因此,我想以经典的pyodbc方式执行查询,获得结果,然后使用提供数据和模式的函数SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)构建Spark数据框架。(一对列表 (列名、数据类型))。请按照一个工作示例从SQL Server的本地示例(生成和)提取示例数据:
import pyodbc
connection_string = "Driver={SQL Server};Server=LOCALHOST;Database=master;Trusted_Connection=yes;"
db_connection = pyodbc.connect(connection_string)
sql_query = """
SET NOCOUNT ON
DECLARE @TBL_TEST AS TABLE (
column_1 INT NOT NULL PRIMARY KEY CLUSTERED IDENTITY(1, 1),
column_2 VARCHAR(10) NOT NULL,
column_3 VARCHAR(20) NULL,
column_4 INT NOT NULL
)
INSERT INTO @TBL_TEST (column_2, column_3, column_4)
VALUES
('test1_col2', 'test1_col3', 100),
('test2_col2', 'test2_col3', 200),
('test3_col2', NULL, 300)
SET NOCOUNT OFF
SELECT t.* FROM @TBL_TEST AS t
"""
cursor = db_connection.cursor()
rows = cursor.execute(sql_query).fetchall()
cursor.close()
db_connection.close()
print(rows)
如何从返回的游标中提取模式并获得一个 schema 对象以给予给createDataFrame()函数?
记住,我的目标是在题目上,所以其他方式也是欢迎的!
提前感谢!
3条答案
按热度按时间9udxz4iz1#
如果使用pyodbc,catalyst优化器生成的java字节码只作为一个节点(executor)运行,而不是整个集群。对于更大的数据集,这会妨碍集群的充分利用和性能问题。
最好使用JDBC的spark驱动程序,微软有一个。
https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16
将复杂的T-SQL创建为视图,然后读取它们。这就是Spark为阅读文件所做的。使用JDBC驱动程序(spark),如果需要,它将允许您通过更改分区方法来并行读取。
为正确版本的spark安装Marven库。
我使用的是Spark版本〉3.1。
我有一个冒险作品数据库,其中有一个视图叫v。
使用JDBC驱动程序进行典型的spark.read()调用。
以下是显示 Dataframe 的结果。
数据框是严格类型化的吗?答案是肯定的,因为它从SQL Server获取字段信息。
x1c4d 1x指令集
最后但并非最不重要的一点是,视图是否复杂?下图显示了8个表被联接和聚合以获得视图的最终结果。
总之,使用数据库中的视图为Spark预编译数据集。使用Microsoft的JDBC驱动程序通过dataframe读取和写入SQL Server。
至于存储过程,有一种方法可以使用驱动程序来执行非查询。我将不得不寻找代码。请继续关注更新或第2部分。
apeeds0o2#
这是答案的第二部分。没有好的方法将存储过程调用的结果作为 Dataframe 返回。
以下是MSFT github站点上该驱动程序的链接,声明不支持存储过程。
https://github.com/microsoft/sql-spark-connector/issues/21
这是一个黑客的工作。
在我的例子中,我的SP要做一些工作,并将其保存到一个临时表中。然后使用上述技术读取该表。
下面的代码删除表(如果存在),然后重新加载。
下面是进入底层JAVA驱动程序管理器的代码。它有一个调用SP的属性。
使用spark.read()从SP填充的新表中检索数据。
ki1q1bka3#
我希望这适合你的用例。同样,这将不会伸缩,因为它运行在控制节点(执行器)。如果你有一个5节点集群,这将只运行在1个节点。
也就是说,我们可以让spark推断数据类型。如果你看pyodbc文档,你必须安装一个本地ODBC驱动程序。这对工作站上的anaconda是好的,但对spark集群是坏的。相反,用户pymssql模块是自包含的本机代码。使用集群库的PyPi部分安装。
https://learn.microsoft.com/en-us/sql/connect/python/pymssql/python-sql-driver-pymssql?view=sql-server-ver16
现在我们有了一个驱动程序,让我们编写一个模块,它将从SELECT语句或存储过程调用EXEC返回一个 Dataframe 。
此代码将只支持一个结果集。如果需要MARS,请修改多个活动结果集。
有大量的注解。简而言之,连接和TSQL的信息元组作为参数传入,并返回 Dataframe 。
是的,cursor.description中有数据类型,但它们是编码的。我没有找到合适的Map。由于您不处理大型数据,请推断架构。否则,请为架构传递DDL语句而不是列标题。
上图显示了推断的类型,下图显示了数据。
我创建了一个非常简单的存储过程,它只从视图中进行选择。
如果我们更改调用以执行此存储过程,则会得到相同的答案。
随附的是一个示例SPARKDDL语句。请参见JSON文档的file_schema部分,它作为参数传递给笔记本。
概括地说,如果您遇到性能问题,请创建一个单节点集群并在该节点上扩展计算。不可以,因为pymssql模块不会使用多个节点。