溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

C++怎么實(shí)現(xiàn)RPC網(wǎng)絡(luò)通訊

發(fā)布時(shí)間:2023-04-27 15:00:32 來源:億速云 閱讀:141 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“C++怎么實(shí)現(xiàn)RPC網(wǎng)絡(luò)通訊”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“C++怎么實(shí)現(xiàn)RPC網(wǎng)絡(luò)通訊”吧!

一、RPC簡(jiǎn)介

1.1 簡(jiǎn)介

RPC指的是計(jì)算機(jī)A的進(jìn)程調(diào)用另外一臺(tái)計(jì)算機(jī)B的進(jìn)程,A上的進(jìn)程被掛起,B上被調(diào)用的進(jìn)程開始執(zhí)行,當(dāng)B執(zhí)行完畢后將執(zhí)行結(jié)果返回給A,A的進(jìn)程繼續(xù)執(zhí)行。調(diào)用方可以通過使用參數(shù)將信息傳送給被調(diào)用方,然后通過傳回的結(jié)果得到信息。這些傳遞的信息都是被加密過或者其他方式處理。這個(gè)過程對(duì)開發(fā)人員是透明的,因此RPC可以看作是本地過程調(diào)用的一種擴(kuò)展,使被調(diào)用過程不必與調(diào)用過程位于同一物理機(jī)中。

RPC可以用于構(gòu)建基于B/S模式的分布式應(yīng)用程序:請(qǐng)求服務(wù)是一個(gè)客戶端、而服務(wù)提供程序是一臺(tái)服務(wù)器。和常規(guī)和本地的調(diào)用過程一樣,遠(yuǎn)程過程調(diào)用是同步操作,在結(jié)果返回之前,需要暫時(shí)中止請(qǐng)求程序。

RPC的優(yōu)點(diǎn):

  • 支持面向過程和面向線程的模型;

  • 內(nèi)部消息傳遞機(jī)制對(duì)用戶隱藏;

  • 基于 RPC 模式的開發(fā)可以減少代碼重寫;

  • 可以在本地環(huán)境和分布式環(huán)境中運(yùn)行;

1.2 本地調(diào)用和遠(yuǎn)程調(diào)用的區(qū)別

以ARM環(huán)境為例,我們拆解本地調(diào)用的過程,以下面代碼為例:

int selfIncrement(int a)
{
    return a + 1;
}
int a = 10;

當(dāng)執(zhí)行到selfIncrement(a)時(shí),首先把a(bǔ)存入寄存器R0,之后轉(zhuǎn)到函數(shù)地址selfIncrement,執(zhí)行函數(shù)內(nèi)的指令 ADD R0,#1。跳轉(zhuǎn)到函數(shù)的地址偏移量在編譯時(shí)確定。

但是如果這是一個(gè)遠(yuǎn)程調(diào)用,selfIncrement函數(shù)存在于其他機(jī)器,為了實(shí)現(xiàn)遠(yuǎn)程調(diào)用,請(qǐng)求方和服務(wù)方需要提供需要解決以下問題:

1. 網(wǎng)絡(luò)傳輸。

本地調(diào)用的參數(shù)存放在寄存器或棧中,在同一塊內(nèi)存中,可以直接訪問到。遠(yuǎn)程過程調(diào)用需要借助網(wǎng)絡(luò)來傳遞參數(shù)和需要調(diào)用的函數(shù) ID。

2. 編解碼

請(qǐng)求方需要將參數(shù)轉(zhuǎn)化為字節(jié)流,服務(wù)提供方需要將字節(jié)流轉(zhuǎn)化為參數(shù)。

3. 函數(shù)映射表

服務(wù)提供方的函數(shù)需要有唯一的 ID 標(biāo)識(shí),請(qǐng)求方通過 ID 標(biāo)識(shí)告知服務(wù)提供方需要調(diào)用哪個(gè)函數(shù)。

以上三個(gè)功能即為 RPC 的基本框架所必須包含的功能。

1.3 RPC運(yùn)行的流程

一次 RPC 調(diào)用的運(yùn)行流程大致分為如下七步,具體如下圖所示。

1.客戶端調(diào)用客戶端存根程序,將參數(shù)傳入;

2.客戶端存根程序?qū)?shù)轉(zhuǎn)化為標(biāo)準(zhǔn)格式,并編組進(jìn)消息;

3.客戶端存根程序?qū)⑾l(fā)送到傳輸層,傳輸層將消息傳送至遠(yuǎn)程服務(wù)器;

4.服務(wù)器的傳輸層將消息傳遞到服務(wù)器存根程序,存根程序?qū)﹃U述進(jìn)行解包,并使用本地調(diào)用的機(jī)制調(diào)用所需的函數(shù);

5.運(yùn)算完成之后,將結(jié)果返回給服務(wù)器存根,存根將結(jié)果編組為消息,之后發(fā)送給傳輸層;

6.服務(wù)器傳輸層將結(jié)果消息發(fā)送給客戶端傳輸層;

7.客戶端存根對(duì)返回消息解包,并返回給調(diào)用方。

C++怎么實(shí)現(xiàn)RPC網(wǎng)絡(luò)通訊

服務(wù)端存根和客戶端存根可以看做是被封裝起來的細(xì)節(jié),這些細(xì)節(jié)對(duì)于開發(fā)人員來說是透明的,但是在客戶端層面看到的是 “本地” 調(diào)用了 selfIncrement() 方法,在服務(wù)端層面,則需要封裝、網(wǎng)絡(luò)傳輸、解封裝等等操作。因此 RPC 可以看作是傳統(tǒng)本地過程調(diào)用的一種擴(kuò)展,其使得被調(diào)用過程不必與調(diào)用過程位于同一物理機(jī)中。

1.4 小結(jié)

RPC 的目標(biāo)是做到在遠(yuǎn)程機(jī)器上調(diào)用函數(shù)與本地調(diào)用函數(shù)一樣的體驗(yàn)。 為了達(dá)到這個(gè)目的,需要實(shí)現(xiàn)網(wǎng)絡(luò)傳輸、序列化與反序列化、函數(shù)映射表等功能,其中網(wǎng)絡(luò)傳輸可以使用socket或其他,序列化和反序列化可以使用protobuf,函數(shù)映射表可以使用std::function。

lambda與std::function內(nèi)容可以看:

C++11 匿名函數(shù)lambda的使用

C++11 std::function 基礎(chǔ)用法

lambda 表達(dá)式和 std::function 的功能是類似的,lambda 表達(dá)式可以轉(zhuǎn)換為 std::function,一般情況下,更多使用 lambda 表達(dá)式,只有在需要回調(diào)函數(shù)的情況下才會(huì)使用 std::function。

二、RPC簡(jiǎn)單實(shí)現(xiàn)

2.1 客戶端實(shí)現(xiàn)代碼

#include <iostream>
#include <memory>
#include <thread>
#include <functional>
#include <cstring>
 
class RPCClient
{
public:
    using RPCCallback = std::function<void(const std::string&)>;
    RPCClient(const std::string& server_address) : server_address_(server_address) {}
    ~RPCClient() {}
 
    void Call(const std::string& method, const std::string& request, RPCCallback callback)
    {
        // 序列化請(qǐng)求數(shù)據(jù)
        std::string data = Serialize(method, request);
        // 發(fā)送請(qǐng)求
        SendRequest(data);
        // 開啟線程接收響應(yīng)
        std::thread t([this, callback]() {
            std::string response = RecvResponse();
            // 反序列化響應(yīng)數(shù)據(jù)
            std::string result = Deserialize(response);
            callback(result);
        });
        t.detach();
    }
 
private:
    std::string Serialize(const std::string& method, const std::string& request)
    {
        // 省略序列化實(shí)現(xiàn)
    }
 
    void SendRequest(const std::string& data)
    {
        // 省略網(wǎng)絡(luò)發(fā)送實(shí)現(xiàn)
    }
 
    std::string RecvResponse()
    {
        // 省略網(wǎng)絡(luò)接收實(shí)現(xiàn)
    }
 
    std::string Deserialize(const std::string& response)
    {
        // 省略反序列化實(shí)現(xiàn)
    }
 
private:
    std::string server_address_;
};
 
int main()
{
    std::shared_ptr<RPCClient> client(new RPCClient("127.0.0.1:8000"));
    client->Call("Add", "1,2", [](const std::string& result) {
        std::cout << "Result: " << result << std::endl;
    });
    return 0;
}

這段代碼定義了RPCClient類來處理客戶端的請(qǐng)求任務(wù),用到了lambda和std::function來處理函數(shù)調(diào)用,在Call中使用多線程技術(shù)。main中使用智能指針管理Rpcclient類,并調(diào)用了客戶端的Add函數(shù)。 

127.0.0.1為本地地址,對(duì)開發(fā)來說需要使用本地地址自測(cè),端口號(hào)為8000,需要選擇一個(gè)空閑端口來通信。

2.2 服務(wù)端代碼

下面是服務(wù)端的實(shí)現(xiàn)

#include <iostream>
#include <map>
#include <functional>
#include <memory>
#include <thread>
#include <mutex>
// 使用第三方庫實(shí)現(xiàn)序列化和反序列化
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/map.hpp>
using namespace std;
// 定義RPC函數(shù)類型
using RPCCallback = std::function<std::string(const std::string&)>;
class RPCHandler {
public:
    void registerCallback(const std::string& name, RPCCallback callback) {
        std::unique_lock<std::mutex> lock(mtx_);
        callbacks_[name] = callback;
    }
    std::string handleRequest(const std::string& request) {
        // 反序列化請(qǐng)求
        std::map<std::string, std::string> requestMap;
        std::istringstream is(request);
        boost::archive::text_iarchive ia(is);
        ia >> requestMap;
        // 查找并調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)
        std::string name = requestMap["name"];
        std::string args = requestMap["args"];
        std::unique_lock<std::mutex> lock(mtx_);
        auto it = callbacks_.find(name);
        if (it == callbacks_.end()) {
            return "Error: Unknown function";
        }
        RPCCallback callback = it->second;
        return callback(args);
    }
private:
    std::map<std::string, RPCCallback> callbacks_;
    std::mutex mtx_;
};
int main() {
    RPCHandler rpcHandler;
    // 注冊(cè)回調(diào)函數(shù)
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a + b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a - b;
        std::ostringstream os;
        os << result;
        return os.str
    });
    // 創(chuàng)建處理請(qǐng)求的線程
    std::thread requestThread([&]() {
        while (true) {
            std::string request;
            std::cin >> request;
            std::string response = rpcHandler.handleRequest(request);
            std::cout << response << std::endl;
        }
    });
    requestThread.join();
    return 0;
}

上面的代碼實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的C++ RPC服務(wù)端。主要實(shí)現(xiàn)了以下功能:

1.定義了RPC函數(shù)類型 RPCCallback,使用std::function<std::string(const std::string&)>表示。

2.RPCHandler類實(shí)現(xiàn)了注冊(cè)函數(shù)和處理請(qǐng)求的功能。

3.在main函數(shù)中創(chuàng)建了一個(gè)RPCHandler對(duì)象,并注冊(cè)了兩個(gè)函數(shù)"add" 和 "sub"。這些函數(shù)通過lambda表達(dá)式實(shí)現(xiàn),并在被調(diào)用時(shí)通過std::istringstream讀取參數(shù)并返回結(jié)果。

4.創(chuàng)建了一個(gè)新線程requestThread來處理請(qǐng)求。在這個(gè)線程中,通過std::cin讀取請(qǐng)求,然后調(diào)用RPCHandler的handleRequest函數(shù)并使用std::cout輸出響應(yīng)。

C++怎么實(shí)現(xiàn)RPC網(wǎng)絡(luò)通訊

注意,這套代碼是最簡(jiǎn)單的RPC機(jī)制,只能調(diào)用本地的資源,他還存在以下缺點(diǎn):

1.代碼并沒有處理錯(cuò)誤處理,如果請(qǐng)求格式不正確或函數(shù)不存在,服務(wù)端將會(huì)返回“Error: Unknown function”。

2.沒有使用網(wǎng)絡(luò)庫進(jìn)行通信,所以只能在本機(jī)上使用。

3.沒有提供高效的并發(fā)性能,所有請(qǐng)求都在單獨(dú)的線程中處理。

4.沒有考慮RPC服務(wù)的可用性和高可用性,如果服務(wù)端崩潰或不可用,客戶端將無法繼續(xù)使用服務(wù)。

5.沒有考慮RPC服務(wù)的可擴(kuò)展性,如果有大量請(qǐng)求需要處理,可能會(huì)導(dǎo)致性能問題。

6.使用了第三方庫Boost.Serialization來實(shí)現(xiàn)序列化和反序列化,如果不想使用第三方庫,可能需要自己實(shí)現(xiàn)序列化的功能。

下面我們一步一步完善它。

三、加強(qiáng)版RPC(以“RPC簡(jiǎn)單實(shí)現(xiàn)”為基礎(chǔ))

3.1 加入錯(cuò)誤處理

下面是 RPCHandler 類中加入錯(cuò)誤處理的代碼示例:

class RPCHandler {
public:
    // 其他代碼...
    std::string handleRequest(const std::string& request) {
        // 反序列化請(qǐng)求
        std::map<std::string, std::string> requestMap;
        std::istringstream is(request);
        boost::archive::text_iarchive ia(is);
        ia >> requestMap;
        // 查找并調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)
        std::string name = requestMap["name"];
        std::string args = requestMap["args"];
        std::unique_lock<std::mutex> lock(mtx_);
        auto it = callbacks_.find(name);
        if (it == callbacks_.end()) {
            return "Error: Unknown function";
        }
        RPCCallback callback = it->second;
        try {
            return callback(args);
        } catch (const std::exception& e) {
            return "Error: Exception occurred: " + std::string(e.what());
        } catch (...) {
            return "Error: Unknown exception occurred";
        }
    }
};

上面的代碼在 RPCHandler 類的 handleRequest 函數(shù)中加入了錯(cuò)誤處理的代碼,它使用了 try-catch 語句來捕獲可能發(fā)生的異常。如果找不到對(duì)應(yīng)的函數(shù)或發(fā)生了異常,會(huì)返回錯(cuò)誤信息。這樣,如果請(qǐng)求格式不正確或函數(shù)不存在,服務(wù)端將會(huì)返回相應(yīng)的錯(cuò)誤信息。

3.2 加入網(wǎng)絡(luò)連接(socket)

加入網(wǎng)絡(luò)連接不需要?jiǎng)臃?wù)端的實(shí)現(xiàn),只需要在main里創(chuàng)造套接字去鏈接就好:

int main() 
{
    io_context ioc;
    ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
    RPCHandler rpcHandler;
    // 注冊(cè)函數(shù)
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a + b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a - b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    // 等待連接
    while (true) {
        ip::tcp::socket socket(ioc);
        acceptor.accept(socket);
        // 創(chuàng)建線程處理請(qǐng)求
        std::thread requestThread([&](ip::tcp::socket socket) {
            while (true) {
                // 讀取請(qǐng)求
                boost::asio::streambuf buf;
                read_until(socket, buf, '\n');
                std::string request = boost::asio::buffer_cast<const char*>(buf.data());
                request.pop_back();
                // 處理請(qǐng)求
                std::string response = rpcHandler.handleRequest(request);
                // 發(fā)送響應(yīng)
                write(socket, buffer(response + '\n'));
            }
        }, std::move(socket));
        requestThread.detach();
    }
    return 0;
}

這是一個(gè)使用Boost.Asio庫實(shí)現(xiàn)的RPC服務(wù)端代碼示例。它使用了TCP協(xié)議監(jiān)聽8080端口,等待客戶端的連接。當(dāng)有客戶端連接時(shí),創(chuàng)建一個(gè)新線程來處理請(qǐng)求。請(qǐng)求和響應(yīng)通過網(wǎng)絡(luò)傳輸。

3.3 加強(qiáng)并發(fā)性

使用并發(fā)和異步機(jī)制,忽略重復(fù)代碼,實(shí)現(xiàn)如下:

class RPCHandler {
public:
    // ...
    void handleConnection(ip::tcp::socket socket) {
        while (true) {
            // 讀取請(qǐng)求
            boost::asio::streambuf buf;
            read_until(socket, buf, '\n');
            std::string request = boost::asio::buffer_cast<const char*>(buf.data());
            request.pop_back();
            // 使用并行執(zhí)行處理請(qǐng)求
            std::vector<std::future<std::string>> futures;
            for (int i = 0; i < request.size(); i++) {
                futures.emplace_back(std::async(std::launch::async, &RPCHandler::handleRequest, this, request[i]));
            }
            // 等待所有請(qǐng)求處理完成并發(fā)送響應(yīng)
            for (auto& f : futures) {
                std::string response = f.get();
                write(socket, buffer(response + '\n'));
            }
        }
    }
};

這樣,請(qǐng)求會(huì)被分成多個(gè)部分并行處理,可以利用多核 CPU 的優(yōu)勢(shì)提高服務(wù)端的并發(fā)性能。

main():

int main() {
    io_context ioc;
    ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
    RPCHandler rpcHandler;
    // 注冊(cè)函數(shù)
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a + b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a - b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    // 創(chuàng)建線程池
    boost::thread_pool::executor pool(10);
    // 等待連接
    while (true) {
        ip::tcp::socket socket(ioc);
        acceptor.accept(socket);
        // 將請(qǐng)求添加到線程池中處理
        pool.submit(boost::bind(&RPCHandler::handleConnection, &rpcHandler, std::move(socket)));
    }
    return 0;
}

在 main 函數(shù)中可以使用 boost::thread_pool::executor 來管理線程池,在線程池中提交任務(wù)來處理請(qǐng)求。這里的線程池大小設(shè)置為10,可以根據(jù)實(shí)際情況調(diào)整。

3.4 加入容錯(cuò)機(jī)制(修改客戶端部分)

在其中使用了重試機(jī)制來保證客戶端能夠重新連接服務(wù)端:

class RPCClient {
public:
    RPCClient(const std::string& address, int port) : address_(address), port_(port), socket_(io_context_) {
        connect();
    }
    std::string call(const std::string& name, const std::string& args) {
        // 序列化請(qǐng)求
        std::ostringstream os;
        boost::archive::text_oarchive oa(os);
        std::map<std::string, std::string> request;
        request["name"] = name;
        request["args"] = args;
        oa << request;
        std::string requestStr = os.str();
        // 發(fā)送請(qǐng)求
        write(socket_, buffer(requestStr + '\n'));
        // 讀取響應(yīng)
        boost::asio::streambuf buf;
        read_until(socket_, buf, '\n');
        std::string response = boost::asio::buffer_cast<const char*>(buf.data());
        response.pop_back();
        return response;
    }
private:
    void connect() {
        bool connected = false;
        while (!connected) {
            try {
                socket_.connect(ip::tcp::endpoint(ip::address::from_string(address_), port_));
                connected = true;
            } catch (const std::exception& e) {
                std::cerr << "Error connecting to server: " << e.what() << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
        }
    }
    std::string address_;
    int port_;
    io_context io_context_;
    ip::tcp::socket socket_;
};

在這個(gè)示例中,當(dāng)連接服務(wù)端失敗時(shí),客戶端會(huì)在一定的時(shí)間間隔后重試連接,直到成功連接上服務(wù)端為止。

到此,相信大家對(duì)“C++怎么實(shí)現(xiàn)RPC網(wǎng)絡(luò)通訊”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI