我需要将mysql中的一组表中的增量记录以parquet格式加载到amazons3。这些表在aws mysql托管示例中的多个数据库/模式中是通用的。代码应该并行地从每个模式(有一组公共表)复制数据。
我正在使用readapi pysparksql连接到mysql示例,并读取模式的每个表的数据,并使用writeapi作为Parquet文件将结果dataframe写入s3。我在循环中为数据库中的每个表运行此操作,如下代码所示:
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver", mysql_db_properties['driver']) \
.option("url", row.database_connection_url) \
.option("dbtable", select_sql) \
.option("user", username) \
.option("password", password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
df.write.parquet(s3_path, mode="append")
我想知道如何将此代码扩展到emr集群中并行运行的多个数据库。请给我建议一个合适的方法。如果需要更多细节,请告诉我。
2条答案
按热度按时间ikfrs5lh1#
你的
list_of_databases
没有并行化。要进行并行处理,应该使用foreach
或是Spark赋予的东西。打开emr中的concurrent选项并为每个表发送emr步骤,或者您可以使用spark的fair调度器,它可以在内部并行地执行作业,只需对代码进行少量修改。
dgsult0t2#
我可以提出两个解决方案:
1简单的方法
一次向emr提交多个作业(每个db一个作业)。如果监控是个问题,只需将失败日志写入s3或hdfs即可。
2需要更改代码位
您可以尝试使用线程来并行化从每个db提取的数据。我可以展示一个如何做的示例,但是您可能需要做更多的更改来适应您的用例。
示例实现:
另外,请确保使用
set('spark.scheduler.mode', 'FAIR')
财产。这将为每个数据库创建一个线程。如果要控制并行运行的线程数,请相应地修改for循环。此外,如果要在程序中创建新作业,请将sparksession与参数一起传递。