我有一个表,其中有数据如图所示。我想创建存储结果动态生成的数据框名称。
例如,在下面的示例中,我希望创建两个不同的 Dataframe 名称dnb_df和es_df,并将读取结果存储在这两个帧中以及每个 Dataframe 的打印结构中
当我运行以下代码时出错
语法错误:无法分配给运算符(TestGlue2.py,第66行)
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])
#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url="jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row in vendor_data :
vendor_query=row.SRC_QUERY
row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url",
Oracle_jdbc_url).option("dbtable", vendor_query).option("user",
Oracle_Username).option("password", Oracle_Password).load()
print(row.VENDOR_NAME+'_df')
1条答案
按热度按时间txu3uszq1#
如果我理解正确的话,您需要动态生成
VENDOR_NAME_DF
。您将无法分配给行对象,也无法将 Dataframe 分配给行,因为您无法创建具有Dataframe类型列的Dataframe。
不过,您可以使用
asDict
将行转换为dict,然后使用它。这是可行的:
输入源_DF:
源查询的结果:
输出(
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
的):最后一行作为字典: