我正在运行Airflow作业以将数据加载到表中。任务是:
- 查询数据库->在pandas数据框中获取结果->将结果集传递给工作进程->每个工作进程处理行并将数据加载到不同的数据库中。
以下是DAG文件的简化版本
import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator
LOADING = PythonOperator(
task_id='LOADING',
python_callable=process,
op_kwargs={
'source_DB': MySqlHook(mysql_conn_id='source_DB'),
'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
},
dag=dag,
)
start >> LOADING >> end
字符串
这是任务的代码:
import os
import logging
import billiard as mp
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
def process(source_DB, destination_DB):
get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()
data = source_DB.get_pandas_df(
sql=get_data,
parameters={}
)
with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
items = [(idx, row) for idx, row in data.iterrows()]
pool.map(load_data, items)
def init_worker(destination_DB):
global conn
conn = destination_DB.get_conn()
def load_data(args):
index, data = args
insert_sql = open(
f"{CUR_DIR}/path/to/insert.sql").read()
conn.autocommit(True)
destination_DB_cur = conn.cursor()
params = {
'para1': data['para1'],
'para2': data['para2']
}
for word, replacement in params.items():
insert_sql = insert_sql.replace(
'{{' + str(word) + '}}', str(replacement))
try:
destination_DB_cur.execute(insert_sql)
except Exception as e:
print(e)
destination_DB_cur.close()
型
该作业工作正常,没有任何错误,但我注意到,有时加载的数据重复3次。
我做了一些研究,有些人说这与台球库有关,其他人说我必须使用连接池来确保同步和协调。
有人能帮助我准确地理解这个问题,以及如何防止它发生吗
2条答案
按热度按时间dkqlctbz1#
jyztefdp2#
在表上创建主键或唯一索引(约束)。