AWS glue:如何在SQL Server中执行存储过程并从其结果中获取数据

c86crjj0  于 2022-11-21  发布在  SQL Server
关注(0)|答案(1)|浏览(197)

我只能执行存储过程,但不能得到它返回的内容。
它只返回值true。
调用dbeaver上的存储过程。Imagen del stored procedure

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

print('Trying to connect to DB')
source_jdbc_conf = glueContext.extract_jdbc_conf('sgc_con')
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print('Trying to connect to DB success!')
print(conn.getMetaData())
print('prepareCall')
statement = "EXEC MNA.dbo.zz_MNAvArticulosListar"
exec_statement = conn.prepareCall(statement)
print('execute')
rs = exec_statement.execute()
print(rs) #true
exec_statement.close()

到目前为止,我只能连接到SQLServer并执行存储过程

f4t66c6m

f4t66c6m1#

修复后,我可以获取从AWS粘合执行的SQL Server存储过程返回的数据。
我将继续在不写入列名称的情况下离开该作业,以便动态地获得它们。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

#Creates Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)

# Defining the schema of the DataFrame
schema = StructType([
  StructField('Id', StringType(), True),
  StructField('IdArticulo', StringType(), True),
  StructField('Referencia', StringType(), True),
  StructField('Descripcion', StringType(), True)
  ])
 
#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
df.printSchema()

#Create list columns list
columns = ['Id', 'IdArticulo', 'Referencia', 'Descripcion']

print('Trying to connect to DB')
source_jdbc_conf = glueContext.extract_jdbc_conf('sgc_con')
con = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print('Trying to connect to DB success!')
print(con.getMetaData())
print('prepareCall')
stmt = con.prepareStatement("EXEC MNA.dbo.zz_MNAvArticulosListar");
#Execute query - stored procedure
rs = stmt.executeQuery();
print(rs.getMetaData().getColumnCount())

#get columns name
columnsNamme = []
i = 0;
for i in range(rs.getMetaData().getColumnCount()):
    columnsNamme.append(rs.getMetaData().getColumnName(i+1))

print(columnsNamme)

#get data result from stored procedure
data = []
while rs.next():
    data.append((rs.getString("Id"),rs.getString("IdArticulo"),rs.getString("Referencia"),rs.getString("Descripcion")))

print(data)

#Create final dataframe
second_df = spark.createDataFrame(data, columns)
second_df.show()

#pending obtain columns and data dynamically, to generate a generic job.

rs.close()
stmt.close()

Image of the output data frame

相关问题