rust 是否在每个线程中使用init代码来创建线程池?

3qpi33ja  于 2022-12-23  发布在  其他
关注(0)|答案(1)|浏览(157)
    • bounty将在4天后过期**。回答此问题可获得+50声望奖励。WebOrCode希望引起更多人关注此问题。

以下代码工作正常,可以在Playground中测试

use std::{thread, time::Duration};
use rand::Rng;

fn main() {
    let mut hiv = Vec::new();
    let (sender, receiver) = crossbeam_channel::unbounded();
    
    // make workers
    for t in 0..5 {
        println!("Make worker {}", t);
        
        let receiver = receiver.clone();  // clone for this thread
        
        let handler = thread::spawn(move || {
            let mut rng = rand::thread_rng(); // each thread have one
            
            loop {
                let r = receiver.recv();
                match r {
                    Ok(x) => {
                        let s = rng.gen_range(100..1000);
                
                        thread::sleep(Duration::from_millis(s));
                        println!("w={} r={} working={}", t, x, s);
                    },
                    _ => { println!("No more work for {} --- {:?}.", t, r); break},
                }
            }
            
        });
        
        hiv.push(handler);
    }
    
    // Generate jobs
    for x in 0..10 {
        sender.send(x).expect("all threads hung up :(");
    }
    drop(sender);
    
    // wait for jobs to finish.
    println!("Wait for all threads to finish.\n");
    for h in hiv {
        h.join().unwrap();
    }
    println!("join() done. Work Finish.");
}

我的问题是:
我可以使用threadpoolrayon或其他Rust板条箱删除样板代码吗?
我知道我可以做我自己的实现,但想知道是否有一些板条箱相同的功能?
从我的研究线程池/人造丝是有用的,当你"发送"代码,它被执行,但我还没有找到方法,使N个线程,将有一些代码/逻辑,他们需要记住?
基本思想是在let mut rng = rand::thread_rng();中,这是每个线程都需要拥有的示例。
还有就是代码还有一些其他的问题,请大家指出来。

sulc1iza

sulc1iza1#

是的,您可以使用Rayon来消除大量的代码,并使剩余的代码更具可读性,如以下要点所示:
https://gist.github.com/BillBarnhill/db07af903cb3c3edb6e715d9cedae028
由于所有权规则的原因,工作池模型在Rust中不是很好,因此并行迭代器通常是更好的选择。
我忘了解决你最关心的问题,每线程上下文,最初。你可以看到如何存储每线程上下文使用ThreadLocal!在这个答案:
https://stackoverflow.com/a/42656422/204343
我将尝试回来编辑代码,以反映ThreadLocal!使用,只要我有更多的时间。
由于thread_id_value的原因,gist要求每夜执行一次,但这几乎是稳定的,如果需要,可以将其删除。
真实的的问题是gust有时间限制,并且比较main_new和main_original,结果令人惊讶。也许不那么令人惊讶,Rayon有很好的调试支持。
在调试版本中,定时输出为:

main_new duration: 1.525667954s
main_original duration: 1.031234059s

您可以看到main_new的运行时间几乎延长了50%。
但是在发布时main_new稍微快了一点:

main_new duration: 1.584190936s
main_original duration: 1.5851124s

下面是要点的精简版,只有新代码。

#![feature(thread_id_value)]

use std::{thread, time::Duration, time::Instant};
use rand::Rng;

#[allow(unused_imports)]
use rayon::prelude::*;

fn do_work(x : u32) -> String {
    let mut rng = rand::thread_rng(); // each thread have one
    let s = rng.gen_range(100..1000);
    let thread_id = thread::current().id();
    
    let t = thread_id.as_u64();
    
    thread::sleep(Duration::from_millis(s));
    format!("w={} r={} working={}", t, x, s)
}

fn process_work_product(output : String) {
    println!("{}", output);
}

fn main() {
    // bit hacky, but lets set number of threads to 5
    rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build_global()
        .unwrap();
    
    let x = 0..10;
    x.into_par_iter()
        .map(do_work)
        .for_each(process_work_product);
}

相关问题