溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

C++11中線程鎖和條件變量的示例分析

發(fā)布時間:2021-06-09 09:45:25 來源:億速云 閱讀:170 作者:小新 欄目:開發(fā)技術

這篇文章主要介紹了C++11中線程鎖和條件變量的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

線程

std::thread類, 位于<thread>頭文件,實現(xiàn)了線程操作。std::thread可以和普通函數(shù)和 lambda 表達式搭配使用。它還允許向線程的執(zhí)行函數(shù)傳遞任意多參數(shù)。

#include <thread>
void func()
{
   // do some work
}
int main()
{
   std::thread t(func);
   t.join();
   return 0;
}

上面的例子中,t是一個線程實例,函數(shù)func()在該線程運行。調(diào)用join()函數(shù)是為了阻塞當前線程(此處即主線程),直到t線程執(zhí)行完畢。線程函數(shù)的返回值都會被忽略,但線程函數(shù)接受任意數(shù)目的輸入?yún)?shù)。

void func(int i, double d, const std::string& s)
{
    std::cout << i << ", " << d << ", " << s << std::endl;
}
int main()
{
   std::thread t(func, 1, 12.50, "sample");
   t.join();
   return 0;
}

雖然可以向線程函數(shù)傳遞任意多參數(shù),但都必須以值傳遞。如果需以引用傳遞,則必須以std::ref或std::cref封裝,如下例所示:

void func(int& a)
{
   a++;
}
int main()
{
   int a = 42;
   std::thread t(func, std::ref(a));
   t.join();
   std::stringcout << a << std::endl;
   return 0;
}

這個程序會打印43,但如果不用std::ref封裝,則輸出會是42。

除了join函數(shù),這個類還提供更多的操作:

swap:交換兩個線程實例的句柄

detach:允許一個線程繼續(xù)獨立于線程實例運行;detach 過的線程不可以再 join

int main()
{
    std::thread t(funct);
    t.detach();

    return 0;
}

一個重要的知識點是,如果一個線程函數(shù)拋出異常,并不會被常規(guī)的try-catch方法捕獲。也就是說,下面的寫法是不會奏效的:

try
{
    std::thread t1(func);
    std::thread t2(func);

    t1.join();
    t2.join();
}
catch(const std::exception& ex)
{
    std::cout << ex.what() << std::endl;
}

要追蹤線程間的異常,你可以在線程函數(shù)內(nèi)捕獲,暫時存儲在一個稍后可以訪問的結構內(nèi)。

std::mutex                       g_mutex;
std::vector<std::exception_ptr>  g_exceptions;

void throw_function()
{
   throw std::exception("something wrong happened");
}

void func()
{
   try
   {
      throw_function();
   }
   catch(...)
   {
      std::lock_guard<std::mutex> lock(g_mutex);
      g_exceptions.push_back(std::current_exception());
   }
}

int main()
{
   g_exceptions.clear();

   std::thread t(func);
   t.join();

   for(auto& e : g_exceptions)
   {
      try 
      {
         if(e != nullptr)
         {
            std::rethrow_exception(e);
         }
      }
      catch(const std::exception& e)
      {
         std::cout << e.what() << std::endl;
      }
   }

   return 0;
}

關于捕獲和處理異常,更深入的信息可以參看Handling C++ exceptions thrown from worker thread in the main thread和How can I propagate exceptions between threads?。

此外,值得注意的是,頭文件還在 `std::this_thread` 命名空間下提供了一些輔助函數(shù):

  • get_id: 返回當前線程的 id

  • yield: 告知調(diào)度器運行其他線程,可用于當前處于繁忙的等待狀態(tài)

  • sleep_for:給定時長,阻塞當前線程

  • sleep_until:阻塞當前線程至給定時間點

在上個例子中,我們需要對g_exceptions這個 vector 的訪問進行同步處理,確保同一時刻只有一個線程能向它插入新的元素。為此我使用了一個 mutex 和一個鎖(lock)。mutex 是同步操作的主體,在 C++ 11 的<mutex>頭文件中,有四種風格的實現(xiàn):

  • mutex:提供了核心的lock()unlock()方法,以及當 mutex 不可用時就會返回的非阻塞方法try_lock()

  • recursive_mutex:允許同一線程內(nèi)對同一 mutex 的多重持有

  • timed_mutex: 與mutex類似,但多了try_lock_for()try_lock_until()兩個方法,用于在特定時長里持有 mutex,或持有 mutex 直到某個特定時間點

  • recursive_timed_mutex:recursive_mutex和timed_mutex的結合

下面是一個使用std::mutex的例子(注意get_id()和sleep_for()兩個輔助方法的使用)。

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
 
std::mutex g_lock;
 
void func()
{
    g_lock.lock();
 
    std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(rand() % 10));
    std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
 
    g_lock.unlock();
}
 
int main()
{
    srand((unsigned int)time(0));
 
    std::thread t1(func);
    std::thread t2(func);
    std::thread t3(func);
 
    t1.join();
    t2.join();
    t3.join();
 
    return 0;
}

輸出如下:

entered thread 10144

leaving thread 10144

entered thread 4188

leaving thread 4188

entered thread 3424

leaving thread 3424

lock()unlock()兩個方法應該很好懂,前者鎖住 mutex,如果該 mutex 不可用,則阻塞線程;稍后,后者解鎖線程。

下面一個例子展示了一個簡單的線程安全的容器(內(nèi)部使用了std::vector)。該容器提供用于添加單一元素的add()方法,以及添加多個元素的addrange()方法(內(nèi)部調(diào)用add()實現(xiàn))。

注意:盡管如此,下面會指出,由于va_args的使用等原因,這個容器并非真正線程安全。此外,dump()方法不應屬于容器,在實際實現(xiàn)中它應該作為一個獨立的輔助函數(shù)。這個例子的目的僅僅是展示 mutex 的相關概念,而非實現(xiàn)一個完整的線程安全的容器。

template <typename T>
class container 
{
    std::mutex _lock;
    std::vector<T> _elements;
public:
    void add(T element) 
    {
        _lock.lock();
        _elements.push_back(element);
        _lock.unlock();
    }
 
    void addrange(int num, ...)
    {
        va_list arguments;
 
        va_start(arguments, num);
 
        for (int i = 0; i < num; i++)
        {
            _lock.lock();
            add(va_arg(arguments, T));
            _lock.unlock();
        }
 
        va_end(arguments); 
    }
 
    void dump()
    {
        _lock.lock();
        for(auto e : _elements)
            std::cout << e << std::endl;
        _lock.unlock();
    }
};
 
void func(container<int>& cont)
{
    cont.addrange(3, rand(), rand(), rand());
}
 
int main()
{
    srand((unsigned int)time(0));
 
    container<int> cont;
 
    std::thread t1(func, std::ref(cont));
    std::thread t2(func, std::ref(cont));
    std::thread t3(func, std::ref(cont));
 
    t1.join();
    t2.join();
    t3.join();
 
    cont.dump();
 
    return 0;
}

當你運行這個程序時,會進入死鎖。原因:在 mutex 被釋放前,容器嘗試多次持有它,這顯然不可能。這就是為什么引入std::recursive_mutex,它允許一個線程對 mutex 多重持有。允許的最大持有次數(shù)并不確定,但當達到上限時,線程鎖會拋出std::system_error錯誤。因此,要解決上面例子的錯誤,除了修改addrange令其不再調(diào)用lock和unlock之外,可以用std::recursive_mutex代替mutex。

template <typename T>
class container 
{
    std::recursive_mutex _lock;
    // ...
};

成功輸出:

6334

18467

41

6334

18467

41

6334

18467

41

敏銳的讀者可能注意到,每次調(diào)用func()輸出的都是相同的數(shù)字。這是因為,seed 是線程局部量,調(diào)用srand()只會在主線程中初始化 seed,在其他工作線程中 seed 并未被初始化,所以每次得到的數(shù)字都是一樣的。

手動加鎖和解鎖可能造成問題,比如忘記解鎖或鎖的次序出錯,都會造成死鎖。C++ 11 標準提供了若干類和函數(shù)來解決這個問題。封裝類允許以 RAII 風格使用 mutex,在一個鎖的生存周期內(nèi)自動加鎖和解鎖。這些封裝類包括:

lock_guard:當一個實例被創(chuàng)建時,會嘗試持有 mutex (通過調(diào)用lock());當實例銷毀時,自動釋放 mutex (通過調(diào)用unlock())。不允許拷貝。

unique_lock:通用 mutex 封裝類,與lock_guard不同,還支持延遲鎖、計時鎖、遞歸鎖、移交鎖的持有權,以及使用條件變量。不允許拷貝,但允許轉移(move)。

借助這些封裝類,可以把容器改寫為:

template <typename T>
class container 
{
    std::recursive_mutex _lock;
    std::vector<T> _elements;
public:
    void add(T element) 
    {
        std::lock_guard<std::recursive_mutex> locker(_lock);
        _elements.push_back(element);
    }
 
    void addrange(int num, ...)
    {
        va_list arguments;
 
        va_start(arguments, num);
 
        for (int i = 0; i < num; i++)
        {
            std::lock_guard<std::recursive_mutex> locker(_lock);
            add(va_arg(arguments, T));
        }
 
        va_end(arguments); 
    }
 
    void dump()
    {
        std::lock_guard<std::recursive_mutex> locker(_lock);
        for(auto e : _elements)
            std::cout << e << std::endl;
    }
};

讀者可能會提出,dump()方法不更改容器的狀態(tài),應該設為 const。但如果你添加 const 關鍵字,會得到如下編譯錯誤:

‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'

一個 mutex (不管何種風格)必須被持有和釋放,這意味著lock()unlock方法必被調(diào)用,這兩個方法是 non-const 的。所以,邏輯上lock_guard的聲明不能是 const (若該方法 為 const,則 mutex 也為 const)。這個問題的解決辦法是,將 mutex 設為mutable。mutable允許由 const 方法更改 mutex 狀態(tài)。不過,這種用法僅限于隱式的,或「元(meta)」狀態(tài)——譬如,運算過的高速緩存、檢索完成的數(shù)據(jù),使得下次調(diào)用能瞬間完成;或者,改變像 mutex 之類的位元,僅僅作為一個對象的實際狀態(tài)的補充。

template <typename T>
class container 
{
   mutable std::recursive_mutex _lock;
   std::vector<T> _elements;
public:
   void dump() const
   {
      std::lock_guard<std::recursive_mutex> locker(_lock);
      for(auto e : _elements)
         std::cout << e << std::endl;
   }
};

這些封裝類鎖的構造函數(shù)可以通過重載的聲明來指定鎖的策略??捎玫牟呗杂校?/p>

  • defer_lock_t類型的defer_lock:不持有 mutex

  • try_to_lock_t類型的try_to_lock: 嘗試持有 mutex 而不阻塞線程

  • adopt_lock_t類型的adopt_lock:假定調(diào)用它的線程已持有 mutex

這些策略的聲明方式如下:

struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
 
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();

除了這些 mutex 封裝類之外,標準庫還提供了兩個方法用于鎖住一個或多個 mutex:

lock:鎖住 mutex,通過一個避免了死鎖的算法(通過調(diào)用lock(),try_lock()和unlock()實現(xiàn))

try_lock:嘗試通過調(diào)用try_lock()來調(diào)用多個 mutex,調(diào)用次序由 mutex 的指定次序而定

下面是一個死鎖案例:有一個元素容器,以及一個exchange()函數(shù)用于互換兩個容器里的某個元素。為了實現(xiàn)線程安全,這個函數(shù)通過一個和容器關聯(lián)的 mutex,對這兩個容器的訪問進行同步。

template <typename T>
class container 
{
public:
    std::mutex _lock;
    std::set<T> _elements;
 
    void add(T element) 
    {
        _elements.insert(element);
    }
 
    void remove(T element) 
    {
        _elements.erase(element);
    }
};
 
void exchange(container<int>& cont1, container<int>& cont2, int value)
{
    cont1._lock.lock();
    std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock
    cont2._lock.lock();    
 
    cont1.remove(value);
    cont2.add(value);
 
    cont1._lock.unlock();
    cont2._lock.unlock();
}

假如這個函數(shù)在兩個線程中被調(diào)用,在其中一個線程中,一個元素被移出容器 1 而加到容器 2;在另一個線程中,它被移出容器 2 而加到容器 1。這可能導致死鎖——當一個線程剛持有第一個鎖,程序馬上切入另一個線程的時候。

int main()
{
    srand((unsigned int)time(NULL));
 
    container<int> cont1; 
    cont1.add(1);
    cont1.add(2);
    cont1.add(3);
 
    container<int> cont2; 
    cont2.add(4);
    cont2.add(5);
    cont2.add(6);
 
    std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3);
    std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6);
 
    t1.join();
    t2.join();
 
    return 0;
}

要解決這個問題,可以使用std::lock,保證所有的鎖都以不會死鎖的方式被持有:

void exchange(container<int>& cont1, container<int>& cont2, int value)
{
    std::lock(cont1._lock, cont2._lock); 
 
    cont1.remove(value);
    cont2.add(value);
 
    cont1._lock.unlock();
    cont2._lock.unlock();
}

條件變量

C++ 11 提供的另一個同步機制是條件變量,用于阻塞一個或多個線程,直到接收到另一個線程的通知信號,或暫停信號,或偽喚醒信號。在<condition_variable>頭文件里,有兩個風格的條件變量實現(xiàn):

condition_variable:所有需要等待這個條件變量的線程,必須先持有一個std::unique_lock

condition_variable_any:更通用的實現(xiàn),任何滿足鎖的基本條件(提供lock()和unlock()功能)的類型都可以使用;在性能和系統(tǒng)資源占用方面可能消耗更多,因而只有在它的靈活性成為必需的情況下才應優(yōu)先使用

條件變量的工作機制如下:

至少有一個線程在等待某個條件成立。等待的線程必須先持有一個unique_lock鎖。這個鎖被傳遞給wait()方法,這會釋放 mutex,阻塞線程直至條件變量收到通知信號。當收到通知信號,線程喚醒,重新持有鎖。

至少有一個線程在發(fā)送條件成立的通知信號。信號的發(fā)送可以用notify_one()方法, 只解鎖任意一個正在等待通知信號的線程,也可以用notify_all()方法, 解鎖所有等待條件成立信號的線程。

在多核處理器系統(tǒng)上,由于使條件喚醒完全可預測的某些復雜機制的存在,可能發(fā)生偽喚醒,即一個線程在沒有別的線程發(fā)送通知信號時也會喚醒。因而,當線程喚醒時,檢查條件是否成立是必要的。而且,偽喚醒可能多次發(fā)生,所以條件檢查要在一個循環(huán)里進行。

下面的代碼展示使用條件變量進行線程同步的實例: 幾個工作員線程在運行過程中會產(chǎn)生錯誤,他們將錯誤碼存在一個隊列里。一個記錄員線程處理這些錯誤碼,將錯誤碼從記錄隊列里取出并打印出來。工作員會在發(fā)生錯誤時,給記錄員發(fā)送信號。記錄員則等待條件變量的通知信號。為了避免偽喚醒,等待工作放在一個檢查布爾值的循環(huán)內(nèi)。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <queue>
#include <random>

std::mutex              g_lockprint;
std::mutex              g_lockqueue;
std::condition_variable g_queuecheck;
std::queue<int>         g_codes;
bool                    g_done;
bool                    g_notified;

void workerfunc(int id, std::mt19937& generator)
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[worker " << id << "]\trunning..." << std::endl;
    }

    // simulate work
    std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

    // simulate error
    int errorcode = id*100+1;
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout  << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
    }

    // notify error to be logged
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);
        g_codes.push(errorcode);
        g_notified = true;
        g_queuecheck.notify_one();
    }
}

void loggerfunc()
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[logger]\trunning..." << std::endl;
    }

    // loop until end is signaled
    while(!g_done)
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);

        while(!g_notified) // used to avoid spurious wakeups 
        {
            g_queuecheck.wait(locker);
        }

        // if there are error codes in the queue process them
        while(!g_codes.empty())
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout << "[logger]\tprocessing error:  " << g_codes.front()  << std::endl;
            g_codes.pop();
        }

        g_notified = false;
    }
}

int main()
{
    // initialize a random generator
    std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());

    // start the logger
    std::thread loggerthread(loggerfunc);

    // start the working threads
    std::vector<std::thread> threads;
    for(int i = 0; i < 5; ++i)
    {
        threads.push_back(std::thread(workerfunc, i+1, std::ref(generator)));
    }

    // work for the workers to finish
    for(auto& t : threads)
        t.join();

    // notify the logger to finish and wait for it
    g_done = true;
    loggerthread.join();

    return 0;
}

運行這個程序,輸出如下(注意這個輸出在每次運行下都會改變,因為每個工作員線程的工作和休眠的時間間隔是任意的):

[logger]        running...

[worker 1]      running...

[worker 2]      running...

[worker 3]      running...

[worker 4]      running...

[worker 5]      running...

[worker 1]      an error occurred: 101

[worker 2]      an error occurred: 201

[logger]        processing error:  101

[logger]        processing error:  201

[worker 5]      an error occurred: 501

[logger]        processing error:  501

[worker 3]      an error occurred: 301

[worker 4]      an error occurred: 401

[logger]        processing error:  301

[logger]        processing error:  401

上面的wait()有兩個重載:

其中一個只需要傳入一個unique_lock;這個重載方法釋放鎖,阻塞線程并將其添加到一個等待該條件變量的線程隊列里;該線程在收到條件變量通知信號或偽喚醒時喚醒,這時鎖被重新持有,函數(shù)返回。

另外一個在unique_lock之外,還接收一個謂詞(predicate),循環(huán)直至其返回 false;這個重載可用于避免偽喚醒,其功能類似于:

while(!predicate()) 
      wait(lock);

于是,上面例子中布爾值g_notified可以不用,而代之以wait的接收謂詞的重載,用于確認狀態(tài)隊列的狀態(tài)(是否為空):

void workerfunc(int id, std::mt19937& generator)
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[worker " << id << "]\trunning..." << std::endl;
    }

    // simulate work
    std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

    // simulate error
    int errorcode = id*100+1;
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
    }

    // notify error to be logged
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);
        g_codes.push(errorcode);
        g_queuecheck.notify_one();
    }
}

void loggerfunc()
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[logger]\trunning..." << std::endl;
    }

    // loop until end is signaled
    while(!g_done)
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);

        g_queuecheck.wait(locker, [&](){return !g_codes.empty();});

        // if there are error codes in the queue process them
        while(!g_codes.empty())
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout << "[logger]\tprocessing error:  " << g_codes.front() << std::endl;
            g_codes.pop();
        }
    }
}

除了可重載的wait(),還有另外兩個等待方法,都有類似的接收謂詞以避免偽喚醒的重載方法:

wait_for:阻塞線程,直至收到條件變量通知信號,或指定時間段已過去。

wait_until:阻塞線程,直到收到條件變量通知信號,或指定時間點已達到。

這兩個方法如果不傳入謂詞,會返回一個cv_status,告知是到達設定時間還是線程因條件變量通知信號或偽喚醒而喚醒。

標準庫還提供了notify_all_at_thread_exit方法,實現(xiàn)了通知其他線程某個給定線程已經(jīng)結束,以及銷毀所有thread_local實例的機制。引入這個方法的原因是,在使用thread_local時, 等待一些通過非join()機制引入的線程可能造成錯誤行為,因為在等待的線程恢復或可能結束之后,他們的析構方法可能還在被調(diào)用(參看N3070和N2880)。特別的,對這個函數(shù)的一個調(diào)用,必須發(fā)生在線程剛好退出之前。下面是一個notify_all_at_thread_exit和condition_variable搭配使用來同步兩個線程的實例:

std::mutex              g_lockprint;
std::mutex              g_lock;
std::condition_variable g_signal;
bool                    g_done;

void workerfunc(std::mt19937& generator)
{
   {
      std::unique_lock<std::mutex> locker(g_lockprint);
      std::cout << "worker running..." << std::endl;
   }

   std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

   {
      std::unique_lock<std::mutex> locker(g_lockprint);
      std::cout << "worker finished..." << std::endl;
   }

   std::unique_lock<std::mutex> lock(g_lock);
   g_done = true;
   std::notify_all_at_thread_exit(g_signal, std::move(lock));
}

int main()
{
   // initialize a random generator
   std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count());

   std::cout << "main running..." << std::endl;

   std::thread worker(workerfunc, std::ref(generator));
   worker.detach();

   std::cout << "main crunching..." << std::endl;

   std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

   {
      std::unique_lock<std::mutex> locker(g_lockprint);
      std::cout << "main waiting for worker..." << std::endl;
   }

   std::unique_lock<std::mutex> lock(g_lock);
   while(!g_done) // avoid spurious wake-ups
      g_signal.wait(lock);

   std::cout << "main finished..." << std::endl;

   return 0;
}

如果 worker 在主線程之前結束,輸出如下:

main running...

worker running...

main crunching...

worker finished...

main waiting for worker...

main finished...

如果主線程在 worker 線程之前結束,輸出如下:

main running...

worker running...

main crunching...

main waiting for worker...

worker finished...

main finished...

感謝你能夠認真閱讀完這篇文章,希望小編分享的“C++11中線程鎖和條件變量的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業(yè)資訊頻道,更多相關知識等著你來學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

c++
AI