溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

C++如何實(shí)現(xiàn)線程池

發(fā)布時(shí)間:2020-07-28 15:40:54 來(lái)源:億速云 閱讀:199 作者:小豬 欄目:編程語(yǔ)言

這篇文章主要為大家展示了C++如何實(shí)現(xiàn)線程池,內(nèi)容簡(jiǎn)而易懂,希望大家可以學(xué)習(xí)一下,學(xué)習(xí)完之后肯定會(huì)有收獲的,下面讓小編帶大家一起來(lái)看看吧。

最近自己寫了一個(gè)線程池。

總的來(lái)說(shuō),線程池就是有一個(gè)任務(wù)隊(duì)列,一個(gè)線程隊(duì)列,線程隊(duì)列不斷地去取任務(wù)隊(duì)列中的任務(wù)來(lái)執(zhí)行,當(dāng)任務(wù)隊(duì)列中為空時(shí),線程阻塞等待新的任務(wù)添加過(guò)來(lái)。

我是用queue來(lái)存放任務(wù),vector存放thread*,然后用condition_variable 來(lái)設(shè)置線程阻塞和喚醒。

下面直接上代碼吧。

線程池類頭文件Thread_Pool.h

/********************************************
   線程池頭文件

  Author:十面埋伏但莫慌
  Time:2020/05/03

*********************************************/
#pragma once
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<thread>
#include<queue>
#include<mutex>
#include<atomic>
#include<vector>
#include<condition_variable>

typedef std::function<void()> Func;//定義線程執(zhí)行函數(shù)類型,方便后面編碼使用。
//任務(wù)類
class Task {
public:
 Task() {}
 ~Task() {}
 int push(Func func);//添加任務(wù);
 int getTaskNum();//獲得當(dāng)前隊(duì)列中的任務(wù)數(shù);
 Func pop();//取出待執(zhí)行的任務(wù);
public:
 std::mutex mx;//鎖;
private:
 
 std::queue<Func> tasks;//任務(wù)隊(duì)列
};
//線程池類
class Thread_Pool {
public:
 Thread_Pool() :IsStart(false) {}
 ~Thread_Pool();
 int addTasks(Func tasks);//添加任務(wù);
 void start();//開啟線程池;
 void stop();//關(guān)閉線程池;
 void run();//線程工作函數(shù);
 int getTaskNum();//獲得當(dāng)前隊(duì)列中的任務(wù)數(shù);
private:
 static const int maxThreadNum = 3;//最大線程數(shù)為3;

 std::condition_variable cond;//條件量;
 std::vector<std::thread*> threads;//線程向量;
 std::atomic<bool> IsStart;//原子變量,判斷線程池是否運(yùn)行;
 Task tasks;//任務(wù)變量;
};
#endif

然后是線程池類成員函數(shù)定義文件Thread_Pool.cpp

/********************************************
   線程池CPP文件

  Author:十面埋伏但莫慌
  Time:2020/05/03

*********************************************/
#include"Thread_Pool.h"
#include<iostream>
int Task::push(Func func) {
 std::unique_lock<std::mutex> lock(mx);
 try {
  tasks.emplace(func);
 }
 catch (std::exception e)
 {
  throw e;
  return -1;
 }
 return 0;
}
int Task::getTaskNum()
{
 return tasks.size();
}
Func Task::pop() {
 std::unique_lock<std::mutex> lock(mx);
 Func temp;
 if (tasks.empty())
  return temp;
 else
 {
  temp = tasks.front();
  tasks.pop();
  return temp;
 }
}
int Thread_Pool::addTasks(Func func)
{
 
 int ret = tasks.push(func);
 cond.notify_one();
 return ret;
}
void Thread_Pool::start() {
 if (!IsStart) {
  IsStart = true;
  for (int i = 0; i < maxThreadNum; i++)
  {
   threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this)));   
  }
  
 }
}
void Thread_Pool::run()
{
 while (IsStart)
 {
  Func f;
  if (tasks.getTaskNum() == 0 && IsStart)
  {
   std::unique_lock<std::mutex> lock(tasks.mx);
   cond.wait(lock);
  }
  if (tasks.getTaskNum() != 0 && IsStart)
  {
   f = tasks.pop();
   if(f)
    f();
  }

 }
}
int Thread_Pool::getTaskNum() {
 return tasks.getTaskNum();
}
void Thread_Pool::stop() {

  IsStart = false;
  cond.notify_all();
  for (auto T : threads) {
   std::cout << "線程 " << T->get_id() << " 已停止。" << std::endl;
   T->join();
   if (T != nullptr)
   {
    delete T;
    T = nullptr;
   }
  }
 std::cout << "所有線程已停止。" << std::endl;
}
Thread_Pool::~Thread_Pool() {
 if (IsStart)
 {
  stop();
 }
}

最后是測(cè)試用的main.cpp

#include<iostream>
#include"Thread_Pool.h"
using namespace std;
void string_out_one() {
 cout << "One!" << endl;
}
void string_out_two() {
 cout << "Two!" << endl;
}
void string_out_three() {
 cout << "Three!" << endl;
}
int main() {
 {
  Thread_Pool Pool;
  try {
   Pool.start();
  }
  catch (std::exception e)
  {
   throw e;
   cout << "線程池創(chuàng)建失敗。" << endl;
  }
  for (int i = 0; i < 50000 ;)
  {  
   if (Pool.getTaskNum() < 1000) {
    Pool.addTasks(string_out_one);
    Pool.addTasks(string_out_two);
    Pool.addTasks(string_out_three);
    std::cout << i++ << std::endl;
   }
  }
  getchar();
 }
 getchar();
 return 0;
}

執(zhí)行的效果如下:

C++如何實(shí)現(xiàn)線程池

線程喚醒和阻塞的邏輯就是在線程工作函數(shù)run函數(shù)中,判斷隊(duì)列是否為空,若為空則設(shè)置鎖并調(diào)用condition變量的wait函數(shù),釋放這個(gè)線程中的鎖并阻塞線程,等待任務(wù)隊(duì)列中新的任務(wù)添加進(jìn)來(lái)后,

condition變量通過(guò)notify_one()隨機(jī)喚醒一個(gè)在wait的線程,取出隊(duì)列中的任務(wù)執(zhí)行。

寫這個(gè)線程池的過(guò)程中碰到的最主要需要注意的就是鎖的使用,在對(duì)隊(duì)列的寫和釋放時(shí)要注意加鎖,在需要阻塞線程時(shí),要注意通過(guò){}設(shè)置鎖的范圍。

IsStart是原子的,所以在寫這個(gè)變量的時(shí)候沒(méi)有另外加鎖。

以上就是關(guān)于C++如何實(shí)現(xiàn)線程池的內(nèi)容,如果你們有學(xué)習(xí)到知識(shí)或者技能,可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI