溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

TP框架中的消息隊列實現(xiàn)

發(fā)布時間:2024-08-27 02:07:52 來源:億速云 閱讀:81 作者:小樊 欄目:編程語言

TP(Thread Pool,線程池)框架是一種用于管理和調(diào)度線程資源的技術(shù),它可以提高系統(tǒng)性能,減少線程創(chuàng)建和銷毀的開銷。在TP框架中,消息隊列是一種常用的任務調(diào)度和處理機制。以下是一個簡單的消息隊列實現(xiàn):

  1. 首先,定義一個任務類(Task),它包含了要執(zhí)行的任務信息和任務處理函數(shù)。
class Task {
public:
    virtual void execute() = 0; // 純虛函數(shù),子類需要實現(xiàn)具體的任務處理邏輯
};
  1. 創(chuàng)建一個消息隊列類(MessageQueue),它使用一個線程安全的隊列(如std::queue)來存儲任務對象。
#include<queue>
#include <mutex>
#include<condition_variable>

class MessageQueue {
public:
    void push(Task* task) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(task);
        cv_.notify_one(); // 通知等待的線程有新任務到來
    }

    Task* pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        cv_.wait(lock, [this] { return !queue_.empty(); }); // 當隊列為空時,線程等待
        Task* task = queue_.front();
        queue_.pop();
        return task;
    }

private:
    std::queue<Task*> queue_;
    std::mutex mutex_;
    std::condition_variable cv_;
};
  1. 創(chuàng)建一個線程池類(ThreadPool),它包含了多個工作線程,這些線程會從消息隊列中獲取任務并執(zhí)行。
#include<vector>
#include<thread>

class ThreadPool {
public:
    ThreadPool(size_t num_threads, MessageQueue& message_queue) : message_queue_(message_queue) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads_.emplace_back(&ThreadPool::worker, this);
        }
    }

    ~ThreadPool() {
        for (auto& thread : threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }

    void add_task(Task* task) {
        message_queue_.push(task);
    }

private:
    void worker() {
        while (true) {
            Task* task = message_queue_.pop();
            if (task) {
                task->execute();
                delete task; // 任務執(zhí)行完畢后,釋放任務對象內(nèi)存
            }
        }
    }

    std::vector<std::thread> threads_;
    MessageQueue& message_queue_;
};
  1. 使用示例:
class MyTask : public Task {
public:
    MyTask(int data) : data_(data) {}

    void execute() override {
        // 在這里實現(xiàn)具體的任務處理邏輯
        std::cout << "Processing task with data: "<< data_<< std::endl;
    }

private:
    int data_;
};

int main() {
    MessageQueue message_queue;
    ThreadPool thread_pool(4, message_queue); // 創(chuàng)建一個包含4個工作線程的線程池

    // 添加任務到線程池
    for (int i = 0; i < 10; ++i) {
        thread_pool.add_task(new MyTask(i));
    }

    // 主線程等待用戶輸入,以便觀察任務執(zhí)行情況
    std::cin.get();

    return 0;
}

這個簡單的TP框架中的消息隊列實現(xiàn)可以根據(jù)實際需求進行擴展和優(yōu)化。例如,可以添加任務優(yōu)先級、限制隊列大小等功能。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI