我对多处理和线程还很陌生。我正在尝试提出一个解决方案,在这个方案中,我必须并行执行多个db2查询,以便能够快速获得结果。
下面是一个例子:
import concurrent.futures
import multiprocessing
import os
import sys
import ibm_db
executed_queries = []
class QueryRunner:
def __init__(self, files):
self.files = files
def execute_query(self, filename):
with ibm_db.connect("DATABASE=sample;HOSTNAME=localhost;PORT=50000;USERNAME=db2admin;PASSWORD=db2admin") as conn:
with open(filename) as f:
query = f.read()
print(f"Executing query: {query}")
for i in range(3):
try:
ibm_db.exec_query(conn, query)
break
except Exception as e:
print(f"Query failed with error: {e}")
else:
# Query failed after 3 retries
print(f"Query failed after 3 retries")
# Check if the for loop executed any iterations
if not i:
# The for loop did not execute any iterations, so replace the query with a different query
filename = next((f for f in self.files if f not in executed_queries and f.startswith('MYqueries'))]
self.execute_query(filename)
def run(self):
with multiprocessing.Pool(processes=15) as pool:
results = pool.starmap(self.execute_query, [(filename,) for filename in filenames])
for result in results:
print(f"Query complete: {result}")
if __name__ == "__main__":
files = os.listdir('.')
filenames = [f for f in files if f not in executed_queries and f.startswith('MYqueries')]
query_runner = QueryRunner(filenames)
query_runner.run()
字符串
我在pool.starmap()中不断得到TypeError
我把所有的查询作为单独的txt文件存储在文件夹MYqueries中。我正在从文件夹中阅读,并尝试并行执行多个查询以加快操作速度。在这方面,任何帮助都是巨大的。
谢谢,如果我犯了很多基本错误,请指出。我会尝试修改所有这些。
在run()方法中使用sys.exc_info()
后,我得到以下结果:(<class 'TypeError'>, TypeError('QueryRunner.execute_query takes 2 positional arguments but 20 were given'), <traceback object at 0x0000025F1454D680>)
个
1条答案
按热度按时间tpgth1q71#
我想问你的问题可能比答案还多:
在
for/else
语句中检查:字符串
但是,只有当
for
循环运行完成,i
的值为2时,才能得到这段代码。所以我看不出not i
会如何计算为True
。但如果它的值为True
,则会出现以下语句:型
这并不编译。也许你的意思是:
型
如果您的目的是从尚未执行的
self.files
中获取下一个filename
值,则此操作不起作用。首先,全局executed_queries
不能在池中的进程之间共享,即使它是可共享的,您也没有代码来更新此列表。此外,run
方法已经在尝试filenames
的每个元素。您在方法
QueryRunner.run
中有:型
是否定义了
filenames
?你是说self.filenames
吗?为什么不干脆:
型
然后打印出
results
列表中的每个元素,但execute_query
隐式返回None
。在execute_query
返回有意义的结果之前,存储和打印结果没有多大意义。然后,您可以:
型
此时
executed_queries
是空的,那么为什么还要测试它呢?"我会做什么"
由于创建连接是有成本的,因此我将为多线程池中的每个线程创建一个专用连接(由于
execute_query
大部分时间都在等待,因此多线程是一个可行的、比多处理更有效的选择)。您需要确定最佳线程数。型
请注意,按照我编写
execute_query
的方式,它可以在不作为池工作函数的情况下被调用。