python 如何将字典传递给并发期货执行人

jgovgodb  于 2023-06-04  发布在  Python
关注(0)|答案(2)|浏览(97)

我是一个使用并发期货的新手,我找不到任何关于如何做到这一点的例子。我有一个全局字典data,我希望并发futures executor调用的函数将结果添加到其中。函数工作正常,但没有数据输出。谢谢你的帮助T

def estimate_shannon_entropy(dna_sequence):
    bases = collections.Counter([tmp_base for tmp_base in dna_sequence])
    # define distribution
    dist = [x/sum(bases.values()) for x in bases.values()]

    # use scipy to calculate entropy
    entropy_value = entropy(dist, base=2)
    #norm_ent = entropy_value/math.log(len(dna_sequence),2)
    return entropy_value

def shan(i):
    
    name1=i.split("/")[-1]
    
    ext1=name1.split(".")[-1]
    
    print(name1)
    
    if ext1=="gz":
        #print("gz detected")
        f=gzip.open(i,'rt')
        k=name1.split(".")[-2]
    
    else:
        f=open(i,'r')
        k=ext
    
    if k[-1]=="a":
        fmt="fasta"
        #print("fasta")
    if k[-1]=="q":
        fmt="fastq"
        #print("fastq")
    c=0
    shannon_total=0
    for x in SeqIO.parse(f,fmt):
        c=c+1
        if c<=samples:
            shannon = estimate_shannon_entropy(str(x.seq))
            shannon_total = shannon_total +shannon
        
    ans=float(shannon_total/samples)
    
    data[name1]=ans
    
folder=sys.argv[1] 
filelist=glob.glob(folder)
filelist.sort(key=tokenize)
#print(filelist)

samples=int(sys.argv[2])
threads=int(sys.argv[3])

global data
data={}

executor = concurrent.futures.ProcessPoolExecutor(threads)
futures = [executor.submit(shan, i) for i in filelist]
concurrent.futures.wait(futures)

print(data)
htzpubme

htzpubme1#

好吧,我找到了答案,会离开这里,以防有更好的方法(肯定有)。使用的管理器:

from multiprocessing import Manager
manager=Manager()
data=manager.dict()
executor = concurrent.futures.ProcessPoolExecutor(threads)
futures = [executor.submit(shan, i,data) for i in filelist]
concurrent.futures.wait(futures)
toiithl6

toiithl62#

如果有人来这里试图使用www.example.com()来完成这个任务ProcessPoolExecutor.map,我已经实现了一个类似的解决方案,它使用一个元组列表作为迭代器。然后,被调用的函数可以将元组解包为单独的var,类似于Pool.starmap()的工作方式。

with Manager() as manager:
    managed_dict = manager.dict()
    args = [(data, managed_dict) for data in data_list]
    with ProcessPoolExecutor(max_workers=4) as p:
        modified_data = p.map(modify_data, args)
        
def modify_data(data_tuple):
  data = data_tuple[0]
  managed_dict = data_tuple[1]
  <your code here>

相关问题