pysparksql中读写api调用的并行执行

ulmd4ohb  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(431)

我需要将mysql中的一组表中的增量记录以parquet格式加载到amazons3。这些表在aws mysql托管示例中的多个数据库/模式中是通用的。代码应该并行地从每个模式(有一组公共表)复制数据。
我正在使用readapi pysparksql连接到mysql示例,并读取模式的每个表的数据,并使用writeapi作为Parquet文件将结果dataframe写入s3。我在循环中为数据库中的每个表运行此操作,如下代码所示:

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver", mysql_db_properties['driver']) \
                    .option("url", row.database_connection_url) \
                    .option("dbtable", select_sql) \
                    .option("user", username) \
                    .option("password", password) \
                    .load()
            s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
            df.write.parquet(s3_path, mode="append")

我想知道如何将此代码扩展到emr集群中并行运行的多个数据库。请给我建议一个合适的方法。如果需要更多细节,请告诉我。

ikfrs5lh

ikfrs5lh1#

你的 list_of_databases 没有并行化。要进行并行处理,应该使用 foreach 或是Spark赋予的东西。
打开emr中的concurrent选项并为每个表发送emr步骤,或者您可以使用spark的fair调度器,它可以在内部并行地执行作业,只需对代码进行少量修改。

dgsult0t

dgsult0t2#

我可以提出两个解决方案:
1简单的方法
一次向emr提交多个作业(每个db一个作业)。如果监控是个问题,只需将失败日志写入s3或hdfs即可。
2需要更改代码位
您可以尝试使用线程来并行化从每个db提取的数据。我可以展示一个如何做的示例,但是您可能需要做更多的更改来适应您的用例。
示例实现:

import threading

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver", mysql_db_properties['driver']) \
                    .option("url", row.database_connection_url) \
                    .option("dbtable", select_sql) \
                    .option("user", username) \
                    .option("password", password) \
                    .load()
            s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
            df.write.parquet(s3_path, mode="append") 

threads = [threading.Thread(target=load_data_to_s3, args=(db) for db in databases_df]

for t in threads:
    t.start()

for t in threads:
    t.join()

另外,请确保使用 set('spark.scheduler.mode', 'FAIR') 财产。这将为每个数据库创建一个线程。如果要控制并行运行的线程数,请相应地修改for循环。
此外,如果要在程序中创建新作业,请将sparksession与参数一起传递。

相关问题