如何在aws glue pyspark中运行并行线程?

s8vozzvw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(673)

我有一个spark工作,它只需要从具有相同转换的多个表中提取数据。基本上是一个for循环,它遍历一个表列表,查询catalog表,添加一个时间戳,然后转入redshift(下面的示例)。
完成这项工作大约需要30分钟。有没有办法在相同的spark/glue环境下并行运行这些程序?如果可以避免的话,我不想创建单独的胶水作业。

import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *

# query the runtime arguments

args = getResolvedOptions(
    sys.argv,
    ["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)

# build the job session and context

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# set the job execution timestamp

job_execution_timestamp = datetime.datetime.utcnow()

tables = []

for table in tables:
    catalog_table = glueContext.create_dynamic_frame.from_catalog(
        database="test", table_name=table, transformation_ctx=table
    )
    data_set = catalog_table.toDF().withColumn(
        "batchLoadTimestamp", lit(job_execution_timestamp)
    )

    # covert back to glue dynamic frame
    export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")

    # remove null rows from dynamic frame
    non_null_records = DropNullFields.apply(
        frame=export_frame, transformation_ctx="non_null_records"
    )

    temp_dir = os.path.join(args["TempDir"], redshift_table_name)

    stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame=non_null_records,
        catalog_connection=args["redshift_catalog_connection"],
        connection_options={
            "dbtable": f"{args['target_schema']}.{redshift_table_name}",
            "database": args["target_database"],
            "preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
        },
        redshift_tmp_dir=temp_dir,
        transformation_ctx="stores_redshiftSink",
    ) ```
pgccezyw

pgccezyw1#

您可以执行以下操作以加快此过程
启用作业的并发执行。
分配足够数量的dpu。
将表列表作为参数传递
使用粘合工作流或step函数并行执行作业。
现在假设您要摄取100个表,您可以将列表分成10个表,每个表同时运行作业10次。
由于您的数据将被并行加载,所以胶水作业运行的时间将减少,因此成本将降低。
另一种更快的方法是直接使用红移工具。
在redshift中创建表,并将batchloadtimestamp列保留为当前时间戳的默认值。
现在创建copy命令并直接从s3将数据加载到表中。
使用glue python shell作业pg8000运行copy命令。
为什么这种方法会更快??因为spark redshift jdbc连接器首先将sparkDataframe卸载到s3,然后准备一个复制命令到redshift表。在直接运行copy命令的同时,您消除了运行unload命令以及将数据读入spark df的开销。

相关问题