溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Rust中多線程的使用方法

發(fā)布時(shí)間:2021-07-01 09:38:41 來(lái)源:億速云 閱讀:391 作者:chen 欄目:編程語(yǔ)言

本篇內(nèi)容主要講解“Rust中多線程的使用方法”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Rust中多線程的使用方法”吧!

Rust對(duì)并發(fā)編程提供了非常豐富的支持,有傳統(tǒng)的多線程方式,也提供流行的異步原語(yǔ)async, await,本篇文章主要介紹多線程方面的基本用法。以下將分為5部分進(jìn)行講解

  1. 線程的創(chuàng)建

  2. 原子變量

  3. 管道 , 條件變量

  4. 生產(chǎn)者消費(fèi)者的實(shí)現(xiàn)

線程的創(chuàng)建

線程的創(chuàng)建非常的簡(jiǎn)單

    let thread = std::thread::spawn(||{
        println!("hello world");
    });
    thread.join(); //等待線程結(jié)束

Rust語(yǔ)言和其他語(yǔ)言不一樣的地方是,如果線程里使用了外部變量,則會(huì)報(bào)錯(cuò)

 let data = String::from("hello world");
    let thread = std::thread::spawn(||{
        println!("{}", data);
    });
    thread.join();

原因如下:
error[E0373]: closure may outlive the current function, but it borrows `data`, which is owned by the current function
  --> src\main.rs:36:37
   |
36 |     let thread = std::thread::spawn(||{
   |                                     ^^ may outlive borrowed value `data`
37 |         println!("{}", data);
   |                        ---- `data` is borrowed here

線程中使用了其他線程的變量是不合法的,必須使用move表明線程擁有data的所有權(quán)

    let data = String::from("hello world");
    let thread = std::thread::spawn(move ||{ //使用move 把data的所有權(quán)轉(zhuǎn)到線程內(nèi)
        println!("{}", data);
    });
    thread.join();

如果想要在多線程間讀寫(xiě)數(shù)據(jù),通常需要加鎖,如java中的 synchornized。與之對(duì)應(yīng),在Rust中需要使用Mutex,由于Mutex是跨線程使用,線程會(huì)轉(zhuǎn)移Mutex的所有權(quán),所以必須配合Arc使用。

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let counter2 = counter.clone();
    let thread = std::thread::spawn(move ||{
       let mut i = counter2.lock().unwrap();//獲取鎖,不需要手動(dòng)釋放,rust的鎖和變量的生命周期一樣,離開(kāi)作用域時(shí),鎖會(huì)自動(dòng)釋放
        *i = *i + 1;
    });
    thread.join();
    let counter = counter.lock().unwrap();
    assert_eq!(1, *counter);
}
原子變量

上面的例子中,我們使用鎖來(lái)實(shí)現(xiàn)對(duì)數(shù)據(jù)的安全訪問(wèn),鎖作用的范圍是調(diào)用lock到鎖對(duì)象的scope結(jié)束,在這段范圍內(nèi)的代碼同一時(shí)間只能被一個(gè)線程訪問(wèn),從這點(diǎn)來(lái)看,使用鎖來(lái)實(shí)現(xiàn)對(duì)單一數(shù)據(jù)的安全訪問(wèn)就有點(diǎn)重了(當(dāng)然從鎖和原子變量的實(shí)現(xiàn)機(jī)制來(lái)說(shuō),鎖也遠(yuǎn)比原子變量重),這時(shí)候使用原子變量效率就會(huì)高很多。

fn main() {
    let counter = Arc::new(AtomicU32::new(0));
    let counter2 = counter.clone();
    let thread = std::thread::spawn(move ||{
        counter2.fetch_add( 1, Ordering::SeqCst);
    });
    counter.fetch_add( 1, Ordering::SeqCst);
    thread.join();
    counter.load(Ordering::SeqCst);
    assert_eq!(2, counter.load(Ordering::SeqCst));
}
管道與條件變量

線程間的通信、協(xié)作,需要有一定的機(jī)制來(lái)支持,管道和條件變量就是這樣的機(jī)制。

  1. 管道(Channel) Rust的channle包含了2個(gè)概念,發(fā)送者和接收者。發(fā)送者可以將消息放入管道,接收者則從管道中讀取消息

fn main() {
    use std::sync::mpsc::channel;
    use std::thread;
    let (sender, receiver) = channel();
    let sender2 = sender.clone();
    thread::spawn(move|| {
        sender2.send(123).unwrap(); //線程1 發(fā)送消息
    });
    thread::spawn(move|| {
        sender.send(456).unwrap(); //線程2 發(fā)送消息
    });
    while let Ok(res) = receiver.recv() { //主線程 接收消息
        println!("{:?}", res);
    }
}

值得注意的是接收者(receiver), 是唯一的,不像發(fā)送者(sender)那樣可以有多個(gè)

  1. 條件變量 條件變量Condvar,不能單獨(dú)使用,需要和監(jiān)視器MutexGuard配合使用。 線程之間通過(guò)調(diào)用 condvar.wait, condvar.notify_all, condvar.notify_one來(lái)實(shí)現(xiàn)線程間通信。

fn println(msg: &str){
    use chrono::Local;
    let date = Local::now();
    println!("{} {}", date.format("%Y-%m-%d %H:%M:%S"), msg)
}
fn main() {
    use std::sync::{Arc, Mutex, Condvar};
    use std::thread;

    let mutex_condva = Arc::new((Mutex::new(false), Condvar::new()));
    let m_c = mutex_condva.clone();

    thread::spawn(move || {
        println("sub thread start..");
        let (lock, cvar) = &*m_c;
        let mut started = lock.lock().unwrap();
         *started = true; //將業(yè)務(wù)參數(shù)設(shè)置為true
        std::thread::sleep(Duration::from_secs(5));
        cvar.notify_all(); // 喚醒條件變量等待者
        println("sub thread finished..");
    });
    println("main thread start..");

    let (lock, cvar) = &*mutex_condva;
    let mut started = lock.lock().unwrap();
    println("main thread begin wait..");
    while !*started { //等待條件變量被喚醒,且等待關(guān)注的業(yè)務(wù)參數(shù)為真。這里需要注意,要在循環(huán)中判斷started,因?yàn)闂l件變量被喚醒時(shí),有可能業(yè)務(wù)條件并未為true
        started = cvar.wait(started).unwrap();
    }
    println("main thread finished..");
}
實(shí)現(xiàn)生產(chǎn)者消費(fèi)者

下面的例子使用條件變量Condar來(lái)實(shí)現(xiàn)多生產(chǎn)者 ,多消費(fèi)者(使用管道比較容易實(shí)現(xiàn),且只能由一個(gè)消費(fèi)者,這里就不介紹了)

struct Queue<T>{
    inner:Vec<T>,
    capacity: usize
}
impl<T> Queue<T> {
    fn new(cap:usize) -> Queue<T> {
        Queue{
            inner: Vec::new(),
            capacity: cap
        }
    }
     fn push(&mut self, data:T) -> bool {
        if !self.is_full() {
            self.inner.push(data);
            true
        } else {
            false
        }
    }
    fn pop(&mut self) -> Option<T> {
        self.inner.pop()
    }
    fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }
    fn is_full(&self) -> bool {
        if self.inner.len() == self.capacity {true} else {false}
    }
}


struct Producer<T>{
    inner:Arc<(Mutex<Queue<T>>,  Condvar)>
}
impl<T:Display> Producer<T> {
    fn new(inner:Arc<(Mutex<Queue<T>>,  Condvar)>) -> Producer<T> {
        Producer{inner}
    }
    fn produce(&self, data:T) {
        let mut queue = self.inner.0.lock().unwrap();
        while (*queue).is_full() {
            println("[Producer] Queue is full, waiting queue to not full");
            queue = self.inner.1.wait(queue).unwrap();
            println("[Producer22] Queue is full, waiting queue to not full");
        }
        println("[Producer] Queue is not full, push data to queue");
        queue.push(data);
       self.inner.1.notify_all();
    }
}

struct Consumer<T>{
    inner: Arc<(Mutex<Queue<T>>,  Condvar)>
}
impl<T:Display> Consumer<T> {
    fn new(inner:Arc<(Mutex<Queue<T>>,  Condvar)>) -> Consumer<T> {
        Consumer{inner}
    }
    fn consume(&self) -> Option<T> {
        let mut queue = self.inner.0.lock().unwrap();
        while (*queue).is_empty() {
            println("[Consumer] Queue is empty, waiting queue to have data");
            queue = self.inner.1.wait(queue).unwrap();
        }
        println("[Consumer] Queue has data, pop data");
        let data = queue.pop();
        self.inner.1.notify_all();
        data
    }
}

fn println(msg: &str){
    use chrono::Local;
    let date = Local::now();
    println!("{:?} {} {}", std::thread::current().id(), date.format("%Y-%m-%d %H:%M:%S"), msg)
}
fn main() {
    let mc = Arc::new((Mutex::new(Queue::<usize>::new(3)), Condvar::new()));
    produce(&mc);
    consume(&mc);
    std::thread::sleep(Duration::from_secs(1000));//主線程等待,不然程序會(huì)提早退出
}

fn produce(mc: &Arc<(Mutex<Queue<usize>>, Condvar)>){
    for i in 0 .. 10 {
        let mc_clone = mc.clone();
        std::thread::spawn(move || {
            std::thread::sleep(Duration::from_secs(random::<u64>() % 10));
            let producer = Producer::new(mc_clone);
             producer.produce(i);
        });
    }
}
fn consume(mc: &Arc<(Mutex<Queue<usize>>, Condvar)>){
    for i in 0 .. 2 {
        let mc_clone = mc.clone();
        std::thread::spawn(move || {
            std::thread::sleep(Duration::from_secs(random::<u64>() % 2));
            let consumer = Consumer::new(mc_clone);
            loop {
                consumer.consume();
            }
        });
    }
}

到此,相信大家對(duì)“Rust中多線程的使用方法”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI