使用python多处理来执行100多个db2查询

svdrlsy4  于 2023-08-05  发布在  DB2
关注(0)|答案(1)|浏览(139)

我对多处理和线程还很陌生。我正在尝试提出一个解决方案,在这个方案中,我必须并行执行多个db2查询,以便能够快速获得结果。
下面是一个例子:

import concurrent.futures
import multiprocessing
import os
import sys
import ibm_db

executed_queries = []

class QueryRunner:
    def __init__(self, files):
        self.files = files

    def execute_query(self, filename):
        with ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin") as conn:
            with open(filename) as f:
                query = f.read()
            print(f"Executing query: {query}")

            for i in range(3):
                try:
                    ibm_db.exec_query(conn, query)
                    break
                except Exception as e:
                    print(f"Query failed with error: {e}")
            else:
                # Query failed after 3 retries
                print(f"Query failed after 3 retries")
                # Check if the for loop executed any iterations
                if not i:
                    # The for loop did not execute any iterations, so replace the query with a different query
                    filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]
                    self.execute_query(filename)

    def run(self):
        with multiprocessing.Pool(processes=15) as pool:
            results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])

            for result in results:
                print(f"Query complete: {result}")

if __name__ == "__main__":
    files = os.listdir('.')
    filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
    query_runner = QueryRunner(filenames)
    query_runner.run()

字符串
我在pool.starmap()中不断得到TypeError
我把所有的查询作为单独的txt文件存储在文件夹MYqueries中。我正在从文件夹中阅读,并尝试并行执行多个查询以加快操作速度。在这方面,任何帮助都是巨大的。
谢谢,如果我犯了很多基本错误,请指出。我会尝试修改所有这些。
在run()方法中使用sys.exc_info()后,我得到以下结果:(<class 'TypeError'>, TypeError('QueryRunner.execute_query takes 2 positional arguments but 20 were given'), <traceback object at 0x0000025F1454D680>)

tpgth1q7

tpgth1q71#

我想问你的问题可能比答案还多:
for/else语句中检查:

# Check if the for loop executed any iterations
                if not i:

字符串
但是,只有当for循环运行完成,i的值为2时,才能得到这段代码。所以我看不出not i会如何计算为True。但如果它的值为True,则会出现以下语句:

filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]


这并不编译。也许你的意思是:

filename = next(f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))


如果您的目的是从尚未执行的self.files中获取下一个filename值,则此操作不起作用。首先,全局executed_queries不能在池中的进程之间共享,即使它是可共享的,您也没有代码来更新此列表。此外,run方法已经在尝试filenames的每个元素。
您在方法QueryRunner.run中有:

results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])


是否定义了filenames?你是说self.filenames吗?
为什么不干脆:

results = pool.map(self.execute_query, self.filenames)


然后打印出results列表中的每个元素,但execute_query隐式返回None。在execute_query返回有意义的结果之前,存储和打印结果没有多大意义。
然后,您可以:

if __name__ == "__main__":
    files = os.listdir('.')
    filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
    ...


此时executed_queries是空的,那么为什么还要测试它呢?
"我会做什么"
由于创建连接是有成本的,因此我将为多线程池中的每个线程创建一个专用连接(由于execute_query大部分时间都在等待,因此多线程是一个可行的、比多处理更有效的选择)。您需要确定最佳线程数。

import ibm_db
import threading

def execute_query(self, filename):
    # Create a connection for this thread if it does not already exist:
    local_storage = threading.local()
    conn = getattr(local_storage, 'conn', None)
    if conn is None:
        conn = ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin")
        localstorage.conn = conn # remember for next time

    with conn:
        with open(filename) as f:
            query = f.read()
        print(f"Executing query: {query}")

        for i in range(3):
            try:
                ibm_db.exec_query(conn, query)
            except Exception as e:
                print(f"Query failed with error: {e}")
            else:
                return # Success

        # Query failed after 3 retries
        print(f"Query failed after 3 retries")

if __name__ == "__main__":
    from mutiprocessing.pool import ThreadPool
    import os

    files = os.listdir('.')
    filenames = [f for f in files if f.startswith('MYqueries')]
    # 15 threads (is there a better pool size?)
    with ThreadPool(15, initializer=init_pool) as pool:
        pool.map(execute_query, filenames)


请注意,按照我编写execute_query的方式,它可以在不作为池工作函数的情况下被调用。

相关问题