如何动态生成Pyspark动态帧名称

k2fxgqgv  于 2023-02-19  发布在  Apache
关注(0)|答案(1)|浏览(148)

我有一个表,其中有数据如图所示。我想创建存储结果动态生成的数据框名称。
例如,在下面的示例中,我希望创建两个不同的 Dataframe 名称dnb_df和es_df,并将读取结果存储在这两个帧中以及每个 Dataframe 的打印结构中
当我运行以下代码时出错
语法错误:无法分配给运算符(TestGlue2.py,第66行)

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col

args = getResolvedOptions(sys.argv, ['JOB_NAME'])



sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session

#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])

#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url="jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row  in vendor_data :
    vendor_query=row.SRC_QUERY
   row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load()
    print(row.VENDOR_NAME+'_df')
txu3uszq

txu3uszq1#

如果我理解正确的话,您需要动态生成VENDOR_NAME_DF
您将无法分配给行对象,也无法将 Dataframe 分配给行,因为您无法创建具有Dataframe类型列的Dataframe。
不过,您可以使用asDict将行转换为dict,然后使用它。
这是可行的:

vendor_data=source_df.collect()

for row in vendor_data:
  rowAsDict=row.asDict()
  # Replace this with spark.read() or any way to create a Dataframe
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"]) 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()

输入源_DF:

源查询的结果:

输出(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()的):

最后一行作为字典:

{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}

相关问题