使用Multiprocessing包插入MySQL DB时,数据重复三次:台球

r8xiu3jd  于 2023-08-02  发布在  Mysql
关注(0)|答案(2)|浏览(119)

我正在运行Airflow作业以将数据加载到表中。任务是:

  • 查询数据库->在pandas数据框中获取结果->将结果集传递给工作进程->每个工作进程处理行并将数据加载到不同的数据库中。

以下是DAG文件的简化版本

import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator

LOADING = PythonOperator(
            task_id='LOADING',
            python_callable=process,
            op_kwargs={
                'source_DB': MySqlHook(mysql_conn_id='source_DB'),
                'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
            },
            dag=dag,
        )

start >> LOADING >> end

字符串
这是任务的代码:

import os
import logging
import billiard as mp

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def process(source_DB, destination_DB):

    get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()

    data = source_DB.get_pandas_df(
        sql=get_data,
        parameters={}
    )

    with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
        items = [(idx, row) for idx, row in data.iterrows()]
        pool.map(load_data, items)

def init_worker(destination_DB):
    global conn
    conn = destination_DB.get_conn()

def load_data(args):

    index, data = args
    insert_sql = open(
        f"{CUR_DIR}/path/to/insert.sql").read()

    conn.autocommit(True)
    destination_DB_cur = conn.cursor()

    params = {
        'para1': data['para1'],
        'para2': data['para2']
    }
    for word, replacement in params.items():
        insert_sql = insert_sql.replace(
            '{{' + str(word) + '}}', str(replacement))

    try:
        destination_DB_cur.execute(insert_sql)
    except Exception as e:
        print(e)
    destination_DB_cur.close()


该作业工作正常,没有任何错误,但我注意到,有时加载的数据重复3次。
我做了一些研究,有些人说这与台球库有关,其他人说我必须使用连接池来确保同步和协调。
有人能帮助我准确地理解这个问题,以及如何防止它发生吗

dkqlctbz

dkqlctbz1#

  • 当多个进程同时将数据插入数据仓库而没有进行适当的同步时,可能会发生数据重复。
  • 重复问题的一个可能原因是多处理池中的每个工作进程都在建立自己到数据仓库的连接,并独立地插入数据。这可能导致并发插入和潜在的重复。
  • 尝试建立连接池
  • 考虑使用不同的多处理库:虽然billiard是多处理库的一个分支,但它可能有一些细微的差别。您可以尝试使用标准的多处理库,看看它是否解决了数据重复问题。
jyztefdp

jyztefdp2#

在表上创建主键或唯一索引(约束)。

相关问题