如何使用AWS Glue运行任意/DDL SQL语句或存储过程

u1ehiz5o  于 2022-09-21  发布在  Spark
关注(0)|答案(5)|浏览(167)

是否可以从AWS Glue python作业执行任意SQL命令(如ALTER TABLE)?我知道我可以使用它从表中读取数据,但有没有办法执行其他特定于数据库的命令?

我需要将数据输入到目标数据库中,然后立即运行一些ALTER命令。

nbnkbykc

nbnkbykc1#

因此,在进行了广泛的研究并与AWS Support一起打开了一个案例后,他们告诉我,目前不可能从PythonShell或Glue pysppark工作。但我刚刚尝试了一些有创意的东西,而且奏效了!我们的想法是使用Sparks已经依赖的py4j,并使用标准的Java SQL包。

这种方法有两个巨大的好处:

1.这样做的一个巨大好处是,您可以将您的数据库连接定义为Glue数据连接,并在其中保留JDBC详细信息和凭据,而无需在Glue代码中对它们进行硬编码。下面的示例通过调用glueContext.extract_jdbc_conf('your_glue_data_connection_name')来获取在Glue中定义的JDBC URL和凭据来实现这一点。
1.如果您需要在受支持的开箱即用的Glue数据库上运行SQL命令,您甚至不需要为该数据库使用/传递JDBC驱动程序-只需确保您为该数据库设置了Glue连接,并将该连接添加到您的Glue作业-Glue将上载正确的数据库驱动程序JAR。

请记住,下面的代码由驱动程序进程执行,不能由Spark Worker/Executor执行。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

logger = glueContext.get_logger()

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# dw-poc-dev spark test

source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))

print(conn.getMetaData().getDatabaseProductName())

# call stored procedure, in this case I call sp_start_job

cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();

conn.close()
xxhby3vn

xxhby3vn2#

几个小时后,我终于让它工作了,所以希望下面的内容会有帮助。我的剧本很大程度上受到了之前的回复的影响,谢谢。

前提条件:

  • 您需要在尝试任何脚本之前配置和测试粘合连接。
  • 设置您的AWS Glue作业时,请使用Spark、Glue版本2.0或更高版本以及Python版本3。
  • 我建议仅为2个工作线程配置此作业,以节省成本;大部分工作将由数据库完成,而不是胶水。
  • 以下内容使用AWS RDS PostgreSQL示例进行了测试,但希望足够灵活,可用于其他数据库。
  • 脚本需要在脚本顶部附近更新3个参数(GLUE_CONNECTION_NAME、DATABASE_NAME和STORED_PROC)。
  • JOB_NAME、连接字符串和凭据由脚本检索,不需要提供。
  • 如果您存储的proc将返回一个数据集,则将ecuteUpdate替换为ecuteQuery。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'

# Below this point no changes should be necessary.

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()

logger = glueContext.get_logger()

logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)

stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);

logger.info("Finished")
gijlo24d

gijlo24d3#

我修改了米什金共享的代码,但它对我不起作用。所以,在排除了一些故障后,我意识到目录中的连接对我来说是不起作用的。所以我不得不手动修改它,并对代码进行了一些调整。现在,因为它不能将Java结果转换成Python结果,所以最后出现了ITS Working但Trouwoiung异常。我做了一项工作,所以要谨慎使用。

below is my code. 

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema')

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

print('Trying to connect to DB')
conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename', 'myusername', 'mypassword')

print('Trying to connect to DB success!')

print(conn.getMetaData().getDatabaseProductName())

# call stored procedure, in this case I call sp_start_job

stmt = conn.createStatement();

# cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();");

print('Call to proc trying ')

# cstmt.setString("job_name", "testjob");

try:
  rs = stmt.executeQuery('call mySchemaName.my_storedproc()');
except:
  print("An exception occurred but proc has run")

# results = cstmt.execute();`enter code here`

conn.close()
bn31dyow

bn31dyow4#

那得看情况。如果使用红移作为目标,则可以选择将前操作和后操作指定为连接选项的一部分。您将能够在那里指定更改操作。但是,对于其余的目标类型,您可能需要使用一些像pg8000(在postgres情况下)和其他一些Python模块

qjp7pelc

qjp7pelc5#

如果将连接对象附加到粘合作业,则可以轻松获得连接设置:

glue_client = boto3.client('glue')
getjob=glue_client.get_job(JobName=args["JOB_NAME"])
connection_settings = glue_client.get_connection(Name=getjob['Job']['Connections']['Connections'][0])
conn_name = connection_settings['Connection']['Name']
df = glueContext.extract_jdbc_conf(conn_name)

相关问题