是否可以从AWS Glue python作业执行任意SQL命令(如ALTER TABLE)?我知道我可以使用它从表中读取数据,但有没有办法执行其他特定于数据库的命令?
我需要将数据输入到目标数据库中,然后立即运行一些ALTER命令。
nbnkbykc1#
因此,在进行了广泛的研究并与AWS Support一起打开了一个案例后,他们告诉我,目前不可能从PythonShell或Glue pysppark工作。但我刚刚尝试了一些有创意的东西,而且奏效了!我们的想法是使用Sparks已经依赖的py4j,并使用标准的Java SQL包。
这种方法有两个巨大的好处:
1.这样做的一个巨大好处是,您可以将您的数据库连接定义为Glue数据连接,并在其中保留JDBC详细信息和凭据,而无需在Glue代码中对它们进行硬编码。下面的示例通过调用glueContext.extract_jdbc_conf('your_glue_data_connection_name')来获取在Glue中定义的JDBC URL和凭据来实现这一点。1.如果您需要在受支持的开箱即用的Glue数据库上运行SQL命令,您甚至不需要为该数据库使用/传递JDBC驱动程序-只需确保您为该数据库设置了Glue连接,并将该连接添加到您的Glue作业-Glue将上载正确的数据库驱动程序JAR。
glueContext.extract_jdbc_conf('your_glue_data_connection_name')
请记住,下面的代码由驱动程序进程执行,不能由Spark Worker/Executor执行。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session logger = glueContext.get_logger() job = Job(glueContext) job.init(args['JOB_NAME'], args) # dw-poc-dev spark test source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name') from py4j.java_gateway import java_import java_import(sc._gateway.jvm,"java.sql.Connection") java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData") java_import(sc._gateway.jvm,"java.sql.DriverManager") java_import(sc._gateway.jvm,"java.sql.SQLException") conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password')) print(conn.getMetaData().getDatabaseProductName()) # call stored procedure, in this case I call sp_start_job cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}"); cstmt.setString("job_name", "testjob"); results = cstmt.execute(); conn.close()
xxhby3vn2#
几个小时后,我终于让它工作了,所以希望下面的内容会有帮助。我的剧本很大程度上受到了之前的回复的影响,谢谢。
前提条件:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glue_connection_name = '[Name of your glue connection (not the job name)]' database_name = '[name of your postgreSQL database]' stored_proc = '[Stored procedure call, for example public.mystoredproc()]' # Below this point no changes should be necessary. args = getResolvedOptions(sys.argv, ['JOB_NAME']) glue_job_name = args['JOB_NAME'] sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(glue_job_name, args) job.commit() logger = glueContext.get_logger() logger.info('Getting details for connection ' + glue_connection_name) source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name) from py4j.java_gateway import java_import java_import(sc._gateway.jvm,"java.sql.Connection") java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData") java_import(sc._gateway.jvm,"java.sql.DriverManager") java_import(sc._gateway.jvm,"java.sql.SQLException") conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password')) logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name) stmt = conn.createStatement(); rs = stmt.executeUpdate('call ' + stored_proc); logger.info("Finished")
gijlo24d3#
我修改了米什金共享的代码,但它对我不起作用。所以,在排除了一些故障后,我意识到目录中的连接对我来说是不起作用的。所以我不得不手动修改它,并对代码进行了一些调整。现在,因为它不能将Java结果转换成Python结果,所以最后出现了ITS Working但Trouwoiung异常。我做了一项工作,所以要谨慎使用。
below is my code. import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [TempDir, JOB_NAME] args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema') from py4j.java_gateway import java_import java_import(sc._gateway.jvm,"java.sql.Connection") java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData") java_import(sc._gateway.jvm,"java.sql.DriverManager") java_import(sc._gateway.jvm,"java.sql.SQLException") print('Trying to connect to DB') conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename', 'myusername', 'mypassword') print('Trying to connect to DB success!') print(conn.getMetaData().getDatabaseProductName()) # call stored procedure, in this case I call sp_start_job stmt = conn.createStatement(); # cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();"); print('Call to proc trying ') # cstmt.setString("job_name", "testjob"); try: rs = stmt.executeQuery('call mySchemaName.my_storedproc()'); except: print("An exception occurred but proc has run") # results = cstmt.execute();`enter code here` conn.close()
bn31dyow4#
那得看情况。如果使用红移作为目标,则可以选择将前操作和后操作指定为连接选项的一部分。您将能够在那里指定更改操作。但是,对于其余的目标类型,您可能需要使用一些像pg8000(在postgres情况下)和其他一些Python模块
qjp7pelc5#
如果将连接对象附加到粘合作业,则可以轻松获得连接设置:
glue_client = boto3.client('glue') getjob=glue_client.get_job(JobName=args["JOB_NAME"]) connection_settings = glue_client.get_connection(Name=getjob['Job']['Connections']['Connections'][0]) conn_name = connection_settings['Connection']['Name'] df = glueContext.extract_jdbc_conf(conn_name)
5条答案
按热度按时间nbnkbykc1#
因此,在进行了广泛的研究并与AWS Support一起打开了一个案例后,他们告诉我,目前不可能从PythonShell或Glue pysppark工作。但我刚刚尝试了一些有创意的东西,而且奏效了!我们的想法是使用Sparks已经依赖的py4j,并使用标准的Java SQL包。
这种方法有两个巨大的好处:
1.这样做的一个巨大好处是,您可以将您的数据库连接定义为Glue数据连接,并在其中保留JDBC详细信息和凭据,而无需在Glue代码中对它们进行硬编码。下面的示例通过调用
glueContext.extract_jdbc_conf('your_glue_data_connection_name')
来获取在Glue中定义的JDBC URL和凭据来实现这一点。1.如果您需要在受支持的开箱即用的Glue数据库上运行SQL命令,您甚至不需要为该数据库使用/传递JDBC驱动程序-只需确保您为该数据库设置了Glue连接,并将该连接添加到您的Glue作业-Glue将上载正确的数据库驱动程序JAR。
请记住,下面的代码由驱动程序进程执行,不能由Spark Worker/Executor执行。
xxhby3vn2#
几个小时后,我终于让它工作了,所以希望下面的内容会有帮助。我的剧本很大程度上受到了之前的回复的影响,谢谢。
前提条件:
gijlo24d3#
我修改了米什金共享的代码,但它对我不起作用。所以,在排除了一些故障后,我意识到目录中的连接对我来说是不起作用的。所以我不得不手动修改它,并对代码进行了一些调整。现在,因为它不能将Java结果转换成Python结果,所以最后出现了ITS Working但Trouwoiung异常。我做了一项工作,所以要谨慎使用。
bn31dyow4#
那得看情况。如果使用红移作为目标,则可以选择将前操作和后操作指定为连接选项的一部分。您将能够在那里指定更改操作。但是,对于其余的目标类型,您可能需要使用一些像pg8000(在postgres情况下)和其他一些Python模块
qjp7pelc5#
如果将连接对象附加到粘合作业,则可以轻松获得连接设置: