下面是我正在使用的代码。目前我的目标是根据线程的数量将每个块划分为更小的部分,这应该在线程之间提供更好的负载平衡。我是在Python中使用线程的新手,所以我不确定我所做的是否是最佳的。
下面是我创建的代码:
#so far this is the most optimal
import heapq
import re
import time
import psutil
from collections import defaultdict
import requests
import tracemalloc
from concurrent.futures import ThreadPoolExecutor
import os
# Load stop words
stop_words_url = "https://gist.githubusercontent.com/sebleier/554280/raw/7e0e4a1ce04c2bb7bd41089c9821dbcf6d0c786c/NLTK's%2520list%2520of%2520english%2520stopwords"
stop_words = set(requests.get(stop_words_url).text.split())
def tokenize(line):
return re.findall(r'\b\w+\b', line)
def count_words(chunk):
word_count = defaultdict(int)
for line in chunk:
words = tokenize(line)
for word in words:
if word.lower() not in stop_words:
word_count[word] += 1
return word_count
def top_k_words(word_count, k):
return heapq.nlargest(k, word_count.items(), key=lambda x: x[1])
def analyze_performance(file_path, k=10, chunk_size=10000, num_threads=os.cpu_count()):
start_time = time.time()
tracemalloc.start()
word_count = defaultdict(int)
with open(file_path, 'r', encoding='utf-8') as file:
chunk = []
for line in file:
chunk.append(line)
if len(chunk) == chunk_size:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
for future in futures:
chunk_word_count = future.result()
for word, count in chunk_word_count.items():
word_count[word] += count
chunk = []
if chunk:
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(count_words, chunk[i::num_threads]) for i in range(num_threads)}
for future in futures:
chunk_word_count = future.result()
for word, count in chunk_word_count.items():
word_count[word] += count
top_k = top_k_words(word_count, k)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
end_time = time.time()
elapsed_time = end_time - start_time
cpu_percent = psutil.cpu_percent()
print(f"Top {k} words: {top_k}")
print(f"Elapsed time: {elapsed_time:.2f} seconds")
print(f"CPU usage: {cpu_percent}%")
#print(f"Memory usage: {memory_usage / (1024 * 1024):.2f} MB")
return elapsed_time, cpu_percent, peak
if __name__ == "__main__":
file_paths = [
"small_50MB_dataset.txt",
]
chunk_sizes = [10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000]
for file_path in file_paths:
print(f"Processing {file_path}")
for chunk_size in chunk_sizes:
for numThreads in range(1, os.cpu_count() + 1):
print("Partition Size:", chunk_size / (1024 * 1024), "MB", "chunk size of:", chunk_size)
elapsed_time, cpu_usage, memory_usage = analyze_performance(file_path, chunk_size=chunk_size, num_threads=numThreads)
print("\n")
result = {
"chunk_size": chunk_size,
"num_threads": numThreads,
"elapsed_time": elapsed_time,
"cpu_usage": cpu_usage,
"memory_usage": float(memory_usage / 10**6)
}
results.append(result)
# Create pandas DataFrame from the results
df2 = pd.DataFrame(results)
输出如下:
+----+--------------+---------------+----------------+-------------+----------------+
| | chunk_size | num_threads | elapsed_time | cpu_usage | memory_usage |
|----+--------------+---------------+----------------+-------------+----------------|
| 40 | 10 | 1 | 51.2827 | 24.8 | 8.01863 |
| 41 | 10 | 2 | 60.1906 | 65.5 | 8.3454 |
| 42 | 100 | 1 | 32.4096 | 64.4 | 8.11009 |
| 43 | 100 | 2 | 33.402 | 60 | 8.16907 |
| 44 | 1000 | 1 | 25.7621 | 62.5 | 8.48084 |
| 45 | 1000 | 2 | 31.2087 | 65 | 9.02304 |
| 46 | 10000 | 1 | 24.5674 | 70.6 | 12.702 |
| 47 | 10000 | 2 | 23.1408 | 63.7 | 13.9474 |
| 48 | 100000 | 1 | 19.4707 | 58.7 | 43.1203 |
| 49 | 100000 | 2 | 21.5641 | 64.6 | 42.0958 |
| 50 | 1e+06 | 1 | 21.23 | 61.9 | 99.1393 |
| 51 | 1e+06 | 2 | 21.2195 | 60.7 | 104.215 |
| 52 | 1e+07 | 1 | 21.5565 | 64.3 | 99.153 |
| 53 | 1e+07 | 2 | 22.712 | 66.1 | 104.216 |
| 54 | 1e+08 | 1 | 20.8239 | 61.9 | 99.1389 |
| 55 | 1e+08 | 2 | 22.5298 | 63.9 | 104.217 |
| 56 | 1e+09 | 1 | 21.4913 | 64.3 | 99.1535 |
| 57 | 1e+09 | 2 | 20.9633 | 58.6 | 104.232 |
| 58 | 1e+10 | 1 | 21.4864 | 64.6 | 99.1389 |
| 59 | 1e+10 | 2 | 22.0327 | 63.9 | 104.216 |
+----+--------------+---------------+----------------+-------------+----------------+
任何关于在哪里改进或改变什么以获得更优解决方案的建议都将非常有用。
1条答案
按热度按时间qxgroojn1#
不能在Python程序中使用线程进行并行处理。
Python是在计算机只有一个CPU核心的时候发明的。语言的架构师决定使用一个全局互斥锁,称为the GIL(全局解释器锁)来保护解释器的所有数据结构。GIL在当时没有缺点,但现在,在多处理器CPU的时代,这意味着没有两个线程可以并行执行Python语句。
不幸的是,GIL的存在影响了Python未指定的memory model。没有办法在不破坏现有Python程序的情况下从Python中删除GIL。
如果你想让你的程序并行执行语句,你必须通过
multiprocessing
模块,而不是threading
模块。