溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Protobuf協(xié)議精品應(yīng)用

發(fā)布時(shí)間:2020-08-03 08:33:50 來源:網(wǎng)絡(luò) 閱讀:479 作者:鐵芒箕 欄目:軟件技術(shù)

??Protobuf應(yīng)用廣泛,尤其作為網(wǎng)絡(luò)通訊協(xié)議最為普遍。本文將詳細(xì)描述幾個(gè)讓人眼前一亮的protobuf協(xié)議設(shè)計(jì),對(duì)準(zhǔn)備應(yīng)用或已經(jīng)應(yīng)用protobuf的開發(fā)者會(huì)有所啟發(fā),甚至可以直接拿過去用。 這里描述的協(xié)議設(shè)計(jì)被用于生產(chǎn)環(huán)境的即時(shí)通訊、埋點(diǎn)數(shù)據(jù)采集、消息推送、redismysql數(shù)據(jù)代理。

??Bwar從2013年開始應(yīng)用protobuf,2014年設(shè)計(jì)了用于mysql數(shù)據(jù)代理的protobuf協(xié)議,2015年設(shè)計(jì)了用于即時(shí)通訊的protobuf協(xié)議。高性能C++ IoC網(wǎng)絡(luò)框架Nebula https://github.com/Bwar/Nebula把這幾個(gè)protobuf協(xié)議設(shè)計(jì)應(yīng)用到了極致。

1. TCP通訊協(xié)議設(shè)計(jì)

??本協(xié)議設(shè)計(jì)于2015年,用于一個(gè)生產(chǎn)環(huán)境的IM和埋點(diǎn)數(shù)據(jù)采集及實(shí)時(shí)分析,2016年又延伸發(fā)展了基于protobuf3的版本并用于開源網(wǎng)絡(luò)框架Nebula?;趐rotobuf2和protobuf3的有較少差別,這里分開講解兩個(gè)版本的協(xié)議設(shè)計(jì)。

1.1. protobuf2.5版Msg

??2015年尚無protobuf3的release版本,protobuf2版本的fixed32類型是固定占用4個(gè)字節(jié)的,非常適合用于網(wǎng)絡(luò)通訊協(xié)議設(shè)計(jì)。Bwar設(shè)計(jì)用于IM系統(tǒng)的協(xié)議包括兩個(gè)protobuf message:MsgHead和MsgBody,協(xié)議定義如下:

```C++
syntax = "proto2";

/**

  • @brief 消息頭
    */
    message MsgHead
    {
    required fixed32 cmd = 1 ; ///< 命令字(壓縮加密算法占高位1字節(jié))
    required fixed32 msgbody_len = 2; ///< 消息體長度(單個(gè)消息體長度不能超過65535即8KB)
    required fixed32 seq = 3; ///< 序列號(hào)
    }

/**

  • @brief 消息體
  • @note 消息體主體是body,所有業(yè)務(wù)邏輯內(nèi)容均放在body里。session_id和session用于接入層路由,
  • 兩者只需要填充一個(gè)即可,首選session_id,當(dāng)session_id用整型無法表達(dá)時(shí)才使用session。
    */
    message MsgBody
    {
    required bytes body = 1; ///< 消息體主體
    optional uint32 session_id = 2; ///< 會(huì)話ID(單聊消息為接收者uid,個(gè)人信息修改為uid,群聊消息為groupid,群管理為groupid)
    optional string session = 3; ///< 會(huì)話ID(當(dāng)session_id用整型無法表達(dá)時(shí)使用)
    optional bytes additional = 4; ///< 接入層附加的數(shù)據(jù)(客戶端無須理會(huì))
    }

??解析收到的字節(jié)流時(shí)先解固定長度(15字節(jié))的MsgHead(protobuf3.0之后的版本必須在cmd、msgbody_len、seq均不為0的情況下才是15字節(jié)),再通過MsgHead里的msgbody_len判斷消息體是否接收完畢,若接收完畢則調(diào)用MsgBody.Parse()解析。MsgBody里的設(shè)計(jì)在下一節(jié)詳細(xì)說明。

??MsgHead在實(shí)際的項(xiàng)目應(yīng)用中對(duì)應(yīng)下面的消息頭并可以相互轉(zhuǎn)換:

```C++
#pragma pack(1)

/**

  • @brief 與客戶端通信消息頭
    */
    struct tagClientMsgHead
    {
    unsigned char version; ///< 協(xié)議版本號(hào)(1字節(jié))
    unsigned char encript; ///< 壓縮加密算法(1字節(jié))
    unsigned short cmd; ///< 命令字/功能號(hào)(2字節(jié))
    unsigned short checksum; ///< 校驗(yàn)碼(2字節(jié))
    unsigned int body_len; ///< 消息體長度(4字節(jié))
    unsigned int seq; ///< 序列號(hào)(4字節(jié))
    };

#pragma pack()


??轉(zhuǎn)換代碼如下:
```C++
E_CODEC_STATUS ClientMsgCodec::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, loss::CBuffer* pBuff)
{
    tagClientMsgHead stClientMsgHead;
    stClientMsgHead.version = 1;        // version暫時(shí)無用
    stClientMsgHead.encript = (unsigned char)(oMsgHead.cmd() >> 24);
    stClientMsgHead.cmd = htons((unsigned short)(gc_uiCmdBit & oMsgHead.cmd()));
    stClientMsgHead.body_len = htonl((unsigned int)oMsgHead.msgbody_len());
    stClientMsgHead.seq = htonl(oMsgHead.seq());
    stClientMsgHead.checksum = htons((unsigned short)stClientMsgHead.checksum);
    ...
}

E_CODEC_STATUS ClientMsgCodec::Decode(loss::CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
    LOG4_TRACE("%s() pBuff->ReadableBytes() = %u", __FUNCTION__, pBuff->ReadableBytes());
    size_t uiHeadSize = sizeof(tagClientMsgHead);
    if (pBuff->ReadableBytes() >= uiHeadSize)
    {
        tagClientMsgHead stClientMsgHead;
        int iReadIdx = pBuff->GetReadIndex();
        pBuff->Read(&stClientMsgHead, uiHeadSize);
        stClientMsgHead.cmd = ntohs(stClientMsgHead.cmd);
        stClientMsgHead.body_len = ntohl(stClientMsgHead.body_len);
        stClientMsgHead.seq = ntohl(stClientMsgHead.seq);
        stClientMsgHead.checksum = ntohs(stClientMsgHead.checksum);
        LOG4_TRACE("cmd %u, seq %u, len %u, pBuff->ReadableBytes() %u",
                        stClientMsgHead.cmd, stClientMsgHead.seq, stClientMsgHead.body_len,
                        pBuff->ReadableBytes());
        oMsgHead.set_cmd(((unsigned int)stClientMsgHead.encript << 24) | stClientMsgHead.cmd);
        oMsgHead.set_msgbody_len(stClientMsgHead.body_len);
        oMsgHead.set_seq(stClientMsgHead.seq);
        ...
    }
}

<br/>

1.2. protobuf3版Msg

??protobuf3版的MsgHead和MsgBody從IM業(yè)務(wù)應(yīng)用實(shí)踐中發(fā)展而來,同時(shí)滿足了埋點(diǎn)數(shù)據(jù)采集、實(shí)時(shí)計(jì)算、消息推送等業(yè)務(wù)需要,更為通用。正因其通用性和高擴(kuò)展性,采用proactor模型的IoC網(wǎng)絡(luò)框架Nebula才會(huì)選用這個(gè)協(xié)議,通過這個(gè)協(xié)議,框架層將網(wǎng)絡(luò)通信工作從業(yè)務(wù)應(yīng)用中完全獨(dú)立出來,基于Nebula框架的應(yīng)用開發(fā)者甚至可以不懂網(wǎng)絡(luò)編程也能開發(fā)出高并發(fā)的分布式服務(wù)。

??MsgHead和MsgBody的protobuf定義如下:

```C++
syntax = "proto3";

// import "google/protobuf/any.proto";

/**

  • @brief 消息頭
  • @note MsgHead為固定15字節(jié)的頭部,當(dāng)MsgHead不等于15字節(jié)時(shí),消息發(fā)送將出錯(cuò)。
  • 在proto2版本,MsgHead為15字節(jié)總是成立,cmd、seq、len都是required;
  • 但proto3版本,MsgHead為15字節(jié)則必須要求cmd、seq、len均不等于0,否則無法正確進(jìn)行收發(fā)編解碼。
    */
    message MsgHead
    {
    fixed32 cmd = 1; ///< 命令字(壓縮加密算法占高位1字節(jié))
    fixed32 seq = 2; ///< 序列號(hào)
    sfixed32 len = 3; ///< 消息體長度
    }

/**

  • @brief 消息體
  • @note 消息體主體是data,所有業(yè)務(wù)邏輯內(nèi)容均放在data里。req_target是請(qǐng)求目標(biāo),用于
  • 服務(wù)端接入路由,請(qǐng)求包必須填充。rsp_result是響應(yīng)結(jié)果,響應(yīng)包必須填充。
    */
    message MsgBody
    {
    oneof msg_type
    {
    Request req_target = 1; ///< 請(qǐng)求目標(biāo)(請(qǐng)求包必須填充)
    Response rsp_result = 2; ///< 響應(yīng)結(jié)果(響應(yīng)包必須填充)
    }
    bytes data = 3; ///< 消息體主體
    bytes add_on = 4; ///< 服務(wù)端接入層附加在請(qǐng)求包的數(shù)據(jù)(客戶端無須理會(huì))
    string trace_id = 5; ///< for log trace

    message Request
    {
    uint32 route_id = 1; ///< 路由ID
    string route = 2; ///< 路由ID(當(dāng)route_id用整型無法表達(dá)時(shí)使用)
    }

    message Response
    {
    int32 code = 1; ///< 錯(cuò)誤碼
    bytes msg = 2; ///< 錯(cuò)誤信息
    }
    }

??MsgBody的data字段存儲(chǔ)消息主體,任何自定義數(shù)據(jù)均可以二進(jìn)制數(shù)據(jù)流方式寫入到data。

??msg_type用于標(biāo)識(shí)該消息是請(qǐng)求還是響應(yīng)(所有網(wǎng)絡(luò)數(shù)據(jù)流都可歸為請(qǐng)求或響應(yīng)),如果是請(qǐng)求,則可以選擇性填充Request里的route_id或route,如果填充了,則框架層無須解析應(yīng)用層協(xié)議(也無法解析)就能自動(dòng)根據(jù)路由ID轉(zhuǎn)發(fā),而無須應(yīng)用層解開data里的內(nèi)容再根據(jù)自定義邏輯轉(zhuǎn)發(fā)。如果是響應(yīng),則定義了統(tǒng)一的錯(cuò)誤標(biāo)準(zhǔn),也為業(yè)務(wù)無關(guān)的錯(cuò)誤處理提供方便。

??add_on是附在長連接上的業(yè)務(wù)數(shù)據(jù),框架并不會(huì)解析但會(huì)在每次轉(zhuǎn)發(fā)消息時(shí)帶上,可以為應(yīng)用提供極其方便且強(qiáng)大的功能。比如,IM用戶登錄時(shí)客戶端只發(fā)送用戶ID和密碼到服務(wù)端,服務(wù)端在登錄校驗(yàn)通過后,將該用戶的昵稱、頭像等信息通過框架提供的方法SetClientData()將數(shù)據(jù)附在服務(wù)端接入層該用戶對(duì)應(yīng)的長連接Channel上,之后所有從該連接過來的請(qǐng)求都會(huì)由框架層自動(dòng)填充add_on字段,服務(wù)端的其他邏輯服務(wù)器只從data中得到自定義業(yè)務(wù)邏輯(比如聊天消息)數(shù)據(jù),卻可以從add_on中得到這個(gè)發(fā)送用戶的信息。add_on的設(shè)計(jì)簡化了應(yīng)用開發(fā)邏輯,并降低了客戶端與服務(wù)端傳輸?shù)臄?shù)據(jù)量。

??trace_id用于分布式日志跟蹤。分布式服務(wù)的錯(cuò)誤定位是相當(dāng)麻煩的,Nebula分布式服務(wù)解決方案提供了日志跟蹤功能,協(xié)議里的trace_id字段的設(shè)計(jì)使得Nebula框架可以在完全不增加應(yīng)用開發(fā)者額外工作的情況下(正常調(diào)用LOG4_INFO寫日志而無須額外工作)實(shí)現(xiàn)所有標(biāo)記著同一trace_id的日志發(fā)送到指定一臺(tái)日志服務(wù)器,定義錯(cuò)誤時(shí)跟單體服務(wù)那樣登錄一臺(tái)服務(wù)器查看日志即可。比如,IM用戶發(fā)送一條消息失敗,在用戶發(fā)送的消息到達(dá)服務(wù)端接入層時(shí)就被打上了trace_id標(biāo)記,這個(gè)id會(huì)一直傳遞到邏輯層、存儲(chǔ)層等,哪個(gè)環(huán)節(jié)發(fā)生了錯(cuò)誤都可以從消息的發(fā)送、轉(zhuǎn)發(fā)、處理路徑上查到。

??MsgHead和MsgBody的編解碼實(shí)現(xiàn)見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp。

2. Http通訊協(xié)議設(shè)計(jì)

??上面的講解的是protobuf應(yīng)用于TCP數(shù)據(jù)流通信,接下來將描述protobuf在http通信上的應(yīng)用。

??在Web服務(wù)中通常會(huì)用Nginx做接入層的反向代理,經(jīng)過Nginx轉(zhuǎn)發(fā)到后續(xù)業(yè)務(wù)邏輯層的tomcat、apache或nginx上,接入層和業(yè)務(wù)邏輯層至少做了兩次http協(xié)議解析,http協(xié)議是文本協(xié)議,傳輸數(shù)據(jù)量大解析速度慢。Nebula框架不是一個(gè)web服務(wù)器,但支持http協(xié)議,在只需提供http接口的應(yīng)用場(chǎng)景(比如完全前后端分離的后端)基于Nebula的單進(jìn)程http服務(wù)端并發(fā)量就可以是tomcat的數(shù)十倍。這一定程度上得益于Nebula框架在http通信上protobuf的應(yīng)用。Nebula框架解析http文本協(xié)議并轉(zhuǎn)化為HttpMsg在服務(wù)內(nèi)部處理,應(yīng)用開發(fā)者填充HttpMsg,接入層將響應(yīng)的HttpMsg轉(zhuǎn)換成http文本協(xié)議發(fā)回給請(qǐng)求方,不管服務(wù)端內(nèi)部經(jīng)過多少次中轉(zhuǎn),始終只有一次http協(xié)議的decode和一次http協(xié)議的encode。

```C++
syntax = "proto3";

message HttpMsg
{
int32 type = 1; ///< http_parser_type 請(qǐng)求或響應(yīng)
int32 http_major = 2; ///< http大版本號(hào)
int32 http_minor = 3; ///< http小版本號(hào)
int32 content_length = 4; ///< 內(nèi)容長度
int32 method = 5; ///< 請(qǐng)求方法
int32 status_code = 6; ///< 響應(yīng)狀態(tài)碼
int32 encoding = 7; ///< 傳輸編碼(只在encode時(shí)使用,當(dāng) Transfer-Encoding: chunked 時(shí),用于標(biāo)識(shí)chunk序號(hào),0表示第一個(gè)chunk,依次遞增)
string url = 8; ///< 地址
map<string, string> headers = 9; ///< http頭域
bytes body = 10; ///< 消息體(當(dāng) Transfer-Encoding: chunked 時(shí),只存儲(chǔ)一個(gè)chunk)
map<string, string> params = 11; ///< GET方法參數(shù),POST方法表單提交的參數(shù)
Upgrade upgrade = 12; ///< 升級(jí)協(xié)議
float keep_alive = 13; ///< keep alive time
string path = 14; ///< Http Decode時(shí)從url中解析出來,不需要人為填充(encode時(shí)不需要填)
bool is_decoding = 15; ///< 是否正在解碼(true 正在解碼, false 未解碼或已完成解碼)

    message Upgrade
    {
        bool is_upgrade             = 1;
        string protocol             = 2;
    }

}


??HttpMsg的編解碼實(shí)現(xiàn)見Nebula框架的[https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp](https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp)。

### 3. 數(shù)據(jù)庫代理服務(wù)協(xié)議設(shè)計(jì)

??如果上面描述的protobuf在網(wǎng)絡(luò)通信上應(yīng)用算不錯(cuò)的話,那以下將protobuf用于數(shù)據(jù)代理上的協(xié)議設(shè)計(jì)則絕對(duì)是讓人眼前一亮。

??有的公司規(guī)定web服務(wù)不得直接訪問MySQL數(shù)據(jù)庫,甚至不允許在web邏輯層拼接SQL語句。如果有這種出于安全性考慮而做的限制,在web邏輯層后面再增加一層業(yè)務(wù)邏輯層成本未免太高了,那么解決辦法應(yīng)該是增加一層業(yè)務(wù)邏輯無關(guān)的代理服務(wù)層。這個(gè)代理服務(wù)層不是簡單的轉(zhuǎn)發(fā)SQL語句這么簡單,因?yàn)閣eb邏輯層可能不允許拼接SQL,由此引出我們這個(gè)用于數(shù)據(jù)庫代理的protobuf協(xié)議設(shè)計(jì)。這個(gè)協(xié)議是將SQL邏輯融入整個(gè)協(xié)議之中,數(shù)據(jù)庫代理層接收并解析這個(gè)協(xié)議后生成SQL語句或用binding方式到數(shù)據(jù)庫去執(zhí)行。數(shù)據(jù)庫代理層只有協(xié)議解析和轉(zhuǎn)化邏輯,無其他任何業(yè)務(wù)邏輯,業(yè)務(wù)邏輯還在web邏輯層,區(qū)別只在于從拼接SQL變成了填充protobuf協(xié)議。

```C++
syntax = "proto2";

package dbagent;

/**
 * @brief DB Agent消息
 */
message DbAgentMsg
{
    enum E_TYPE
    {
        UNDEFINE                      = 0;              ///< 未定義
        REQ_CONNECT                   = 1;              ///< 連接DB請(qǐng)求
        RSP_CONNECT                   = 2;              ///< 連接DB響應(yīng)
        REQ_QUERY                     = 3;              ///< 執(zhí)行SQL請(qǐng)求
        RSP_QUERY                     = 4;              ///< 執(zhí)行SQL響應(yīng)
        REQ_DISCONNECT                = 5;              ///< 關(guān)閉連接請(qǐng)求
        RSP_DISCONNECT                = 6;              ///< 關(guān)閉連接響應(yīng)
        RSP_RECORD                    = 7;              ///< 結(jié)果集記錄
        RSP_COMMON                    = 8;              ///< 通用響應(yīng)(當(dāng)請(qǐng)求不能被Server所認(rèn)知時(shí)會(huì)做出這個(gè)回應(yīng))
        REQ_GET_CONNECT               = 9;              ///< 獲取連接請(qǐng)求
        RSP_GET_CONNECT               = 10;             ///< 獲取連接響應(yīng)
    }

    required E_TYPE type                        = 1;    ///< 消息/操作 類型
    optional RequestConnect req_connect         = 2;    ///< 連接請(qǐng)求
    optional ResponseConnect rsp_connect        = 3;    ///< 連接響應(yīng)
    optional RequestDisconnect req_disconnect   = 4;    ///< 關(guān)閉請(qǐng)求
    optional ResponseDisconnect rsp_disconnect  = 5;    ///< 關(guān)閉響應(yīng)
    optional RequestQuery req_query             = 6;    ///< 執(zhí)行SQL請(qǐng)求
    optional ResponseQuery rsp_query            = 7;    ///< 執(zhí)行SQL響應(yīng)
    optional ResponseRecord rsp_record          = 8;    ///< SELECT結(jié)果集記錄
    optional ResponseCommon rsp_common          = 9;    ///< 通用響應(yīng)
    optional RequestGetConnection req_get_conn  = 10;   ///< 獲取連接請(qǐng)求
    optional ResponseGetConnection rsp_get_conn = 11;   ///< 獲取連接響應(yīng)
}

/**
 * @brief 連接請(qǐng)求
 */
message RequestConnect
{
    required string host        = 1;                    ///< DB所在服務(wù)器IP
    required int32  port        = 2;                    ///< DB端口
    required string user        = 3;                    ///< DB用戶名
    required string password    = 4;                    ///< DB用戶密碼
    required string dbname      = 5;                    ///< DB庫名
    required string charset     = 6;                    ///< DB字符集
}

/**
 * @brief 連接響應(yīng)
 */
message ResponseConnect
{
    required int32 connect_id  = 1;                    ///< 連接ID (連接失敗時(shí),connect_id為0)
    optional int32 err_no       = 2;                   ///< 錯(cuò)誤碼 0 表示連接成功
    optional string err_msg     = 3;                   ///< 錯(cuò)誤信息
}

/**
 * @brief 關(guān)閉連接請(qǐng)求
 */
message RequestDisconnect
{
    required int32 connect_id  = 1;                    ///< 連接ID (連接失敗時(shí),connect_id為0)
}

/**
 * @brief 關(guān)閉連接響應(yīng)
 */
message ResponseDisconnect
{
    optional int32 err_no       = 2;                    ///< 錯(cuò)誤碼 0 表示連接成功
    optional string err_msg     = 3;                    ///< 錯(cuò)誤信息
}

/**
 * @brief 執(zhí)行SQL請(qǐng)求
 */
message RequestQuery
{
    required E_QUERY_TYPE query_type  = 1;              ///< 查詢類型
    required string table_name        = 2;              ///< 表名
    repeated Field fields             = 3;              ///< 列類型
    repeated ConditionGroup conditions= 4;              ///< where條件組(由group_relation指定,若不指定則默認(rèn)為AND關(guān)系)
    repeated string groupby_col       = 5;              ///< group by字段
    repeated OrderBy orderby_col      = 6;              ///< order by字段
    optional uint32 limit             = 7;              ///< 指定返回的行數(shù)的最大值  (limit 200)
    optional uint32 limit_from        = 8;              ///< 指定返回的第一行的偏移量 (limit 100, 200)
    optional ConditionGroup.E_RELATION group_relation = 9; ///< where條件組的關(guān)系,條件組之間有且只有一種關(guān)系(and或者or)
    optional int32 connect_id         = 10;             ///< 連接ID,有效連接ID(長連接,當(dāng)connect后多次執(zhí)行query可以使用connect_id)
    optional string bid               = 11;             ///< 業(yè)務(wù)ID,在CmdDbAgent.json配置文件中配置(短連接,每次執(zhí)行query時(shí)連接,執(zhí)行完后關(guān)閉連接)
    optional string password          = 12;             ///< 業(yè)務(wù)密碼

    enum E_QUERY_TYPE                                   ///< 查詢類型
    {
        SELECT                        = 0;              ///< select查詢
        INSERT                        = 1;              ///< insert插入
        INSERT_IGNORE                 = 2;              ///< insert ignore插入,若存在則放棄
        UPDATE                        = 3;              ///< update更新
        REPLACE                       = 4;              ///< replace覆蓋插入
        DELETE                        = 5;              ///< delete刪除
    }

    enum E_COL_TYPE                                     ///< 列類型
    {
        STRING                        = 0;              ///< char, varchar, text, datetime, timestamp等
        INT                           = 1;              ///< int
        BIGINT                        = 2;              ///< bigint
        FLOAT                         = 3;              ///< float
        DOUBLE                        = 4;              ///< double
    }

    message Field                                       ///< 字段
    {
        required string col_name      = 1;              ///< 列名
        required E_COL_TYPE col_type  = 2;              ///< 列類型
        required bytes col_value      = 3;              ///< 列值
        optional string col_as        = 4;              ///< as列名
    }

    message Condition                                   ///< where條件
    {
        required E_RELATION relation  = 1;              ///< 關(guān)系(=, !=, >, <, >=, <= 等)
        required E_COL_TYPE col_type  = 2;              ///< 列類型
        required string col_name      = 3;              ///< 列名
        repeated bytes col_values     = 4;              ///< 列值(當(dāng)且僅當(dāng)relation為IN時(shí)值的個(gè)數(shù)大于1有效)
        optional string col_name_right= 5;              ///< 關(guān)系右邊列名(用于where col1=col2這種情況)
        enum E_RELATION
        {
            EQ                        = 0;              ///< 等于=
            NE                        = 1;              ///< 不等于!=
            GT                        = 2;              ///< 大于>
            LT                        = 3;              ///< 小于<
            GE                        = 4;              ///< 大于等于>=
            LE                        = 5;              ///< 小于等于<=
            LIKE                      = 6;              ///< like
            IN                        = 7;              ///< in (1, 2, 3, 4, 5)
        }
    }

    message ConditionGroup                              ///< where條件組合
    {
        required E_RELATION relation     = 1;           ///< 條件之間的關(guān)系,一個(gè)ConditionGroup里的所有Condition之間有且只有一種關(guān)系(and或者or)
        repeated Condition condition     = 2;           ///< 條件
        enum E_RELATION
        {
            AND                        = 0;             ///< and且
            OR                         = 1;             ///< or或
        }
    }

    message OrderBy
    {
        required E_RELATION relation    = 1;            ///< 降序或升序
        required string col_name        = 2;            ///< 列名
        enum E_RELATION
        {
            ASC                         = 0;
            DESC                        = 1;
        }
    }
}

/**
 * @brief 執(zhí)行SQL響應(yīng)
 */
message ResponseQuery
{
    required uint32 seq         = 1;                    ///< 數(shù)據(jù)包序列號(hào)(SELECT結(jié)果集會(huì)分包返回,只有一個(gè)包的情況或已到達(dá)最后一個(gè)包則seq=0xFFFFFFFF)
    required int32 err_no       = 2;                    ///< 錯(cuò)誤碼,0 表示執(zhí)行成功
    optional string err_msg     = 3;                    ///< 錯(cuò)誤信息
    optional uint64 insert_id   = 4;                    ///< mysql_insert_id()獲取的值(視執(zhí)行的SQL語句而定,不一定存在)
    repeated bytes dict         = 5;                    ///< 結(jié)果集字典(視執(zhí)行的SQL語句而定,不一定存在)
}

/**
 * @brief SELECT語句返回結(jié)果集的一條記錄
 */
message ResponseRecord
{
    required uint32 seq         = 1;                    ///< 數(shù)據(jù)包序列號(hào)(SELECT結(jié)果集會(huì)分包返回,已到達(dá)最后一個(gè)包則seq=0xFFFFFFFF)
    repeated bytes field        = 2;                    ///< 數(shù)據(jù)集記錄的字段
}

/**
 * @brief 常規(guī)響應(yīng)
 */
message ResponseCommon
{
    optional int32 err_no       = 1;                    ///< 錯(cuò)誤碼 0 表示連接成功
    optional string err_msg     = 2;                    ///< 錯(cuò)誤信息
}

/**
 * @brief 獲取連接請(qǐng)求
 */
message RequestGetConnection
{
    required string bid         = 1;                    ///< 業(yè)務(wù)ID,在dbproxy配置文件中配置
    required string password    = 2;                    ///< 業(yè)務(wù)密碼
}

/**
 * @brief 獲取連接響應(yīng)
 */
message ResponseGetConnection
{
    required int32 connect_id   = 1;                    ///< 連接ID,有效連接ID,否則執(zhí)行失敗
    optional int32 err_no       = 2;                   ///< 錯(cuò)誤碼 0 表示連接成功
    optional string err_msg     = 3;                   ///< 錯(cuò)誤信息
}

??基于這個(gè)數(shù)據(jù)庫操作協(xié)議開發(fā)的數(shù)據(jù)庫代理層完全解決了web邏輯層不允許直接訪問數(shù)據(jù)庫也不允許拼接SQL語句的問題,而且?guī)缀鯖]有增加開發(fā)代價(jià)。另外,基于這個(gè)協(xié)議的數(shù)據(jù)庫代理天然防止SQL注入(在代理層校驗(yàn)field_name,并且mysql_escape_string(filed_value)),雖然防SQL注入應(yīng)是應(yīng)用層的責(zé)任,但多了數(shù)據(jù)代理這層保障也是好事。

??這個(gè)協(xié)議只支持簡單SQL,不支持聯(lián)合查詢、子查詢,也不支持存儲(chǔ)過程,如果需要支持的話協(xié)議會(huì)更復(fù)雜。在Bwar所負(fù)責(zé)過的業(yè)務(wù)里,基本都禁止數(shù)據(jù)庫聯(lián)合查詢之類,只把數(shù)據(jù)庫當(dāng)存儲(chǔ)用,不把邏輯寫到SQL語句里,所以這個(gè)協(xié)議滿足大部分業(yè)務(wù)需要。

??這一節(jié)只說明數(shù)據(jù)庫代理協(xié)議,下一節(jié)將從數(shù)據(jù)庫代理協(xié)議延伸并提供協(xié)議代碼講解。

4. Redis和MySQL數(shù)據(jù)代理協(xié)議設(shè)計(jì)

??大部分后臺(tái)應(yīng)用只有MySQL是不夠的,往往還需要緩存,經(jīng)常會(huì)用Redis來做數(shù)據(jù)緩存。用緩存意味著數(shù)據(jù)至少需要同時(shí)寫到Redis和MySQL,又或者在未命中緩存時(shí)從MySQL中讀取到的數(shù)據(jù)需要回寫到Redis,這些通常都是由業(yè)務(wù)邏輯層來做的。也有例外,Nebula提供的分布式解決方案是由數(shù)據(jù)代理層來做的,業(yè)務(wù)邏輯層只需向數(shù)據(jù)代理層發(fā)送一個(gè)protobuf協(xié)議數(shù)據(jù),數(shù)據(jù)代理層就會(huì)完成Redis和MySQL雙寫或緩存未命中時(shí)的自動(dòng)回寫(暫且不探討數(shù)據(jù)一致性問題)。數(shù)據(jù)代理層來做這些工作是為了減少業(yè)務(wù)邏輯層的復(fù)雜度,提高開發(fā)效率。既然是為了提高開發(fā)效率,就得讓業(yè)務(wù)邏輯層低于原來同時(shí)操作Redis和MySQL的開發(fā)量。Nebula提供的NebulaMydis就是這樣一個(gè)讓原來同時(shí)操作Redis和MySQL的開發(fā)量(假設(shè)是2)降到1.2左右。

??這個(gè)同時(shí)操作Redis和MySQL的數(shù)據(jù)代理協(xié)議如下:

```C++
syntax = "proto3";

package neb;

message Mydis
{
uint32 section_factor = 1;
RedisOperate redis_operate = 2;
DbOperate db_operate = 3;

message RedisOperate
{
    bytes key_name           = 1;
    string redis_cmd_read    = 2;
    string redis_cmd_write   = 3;
    OPERATE_TYPE op_type     = 4;
    repeated Field fields    = 5;
    int32 key_ttl            = 6;
    int32 redis_structure    = 7;      ///< redis數(shù)據(jù)類型
    int32 data_purpose       = 8;      ///< 數(shù)據(jù)用途
    bytes hash_key           = 9;      ///< 可選hash key,當(dāng)has_hash_key()時(shí)用hash_key來計(jì)算hash值,否則用key_name來計(jì)算hash值

    enum OPERATE_TYPE
    {
        T_READ  = 0;
        T_WRITE = 1;
    }
}

message DbOperate
{
    E_QUERY_TYPE query_type                   = 1;         ///< 查詢類型
    string table_name                         = 2;         ///< 表名
    repeated Field fields                     = 3;         ///< 列類型
    repeated ConditionGroup conditions        = 4;         ///< where條件組(由group_relation指定,若不指定則默認(rèn)為AND關(guān)系)
    repeated string groupby_col               = 5;         ///< group by字段
    repeated OrderBy orderby_col              = 6;         ///< order by字段
    ConditionGroup.E_RELATION group_relation  = 7;         ///< where條件組的關(guān)系,條件組之間有且只有一種關(guān)系(and或者or)
    uint32 limit                              = 8;         ///< 指定返回的行數(shù)的最大值  (limit 200)
    uint32 limit_from                         = 9;         ///< 指定返回的第一行的偏移量 (limit 100, 200)
    uint32 mod_factor                         = 10;        ///< 分表取模因子,當(dāng)這個(gè)字段沒有時(shí)使用section_factor

    enum E_QUERY_TYPE                                      ///< 查詢類型
    {
        SELECT                        = 0;              ///< select查詢
        INSERT                        = 1;              ///< insert插入
        INSERT_IGNORE                 = 2;              ///< insert ignore插入,若存在則放棄
        UPDATE                        = 3;              ///< update更新
        REPLACE                       = 4;              ///< replace覆蓋插入
        DELETE                        = 5;              ///< delete刪除
    }

    message Condition                                         ///< where條件
    {
        E_RELATION relation                  = 1;              ///< 關(guān)系(=, !=, >, <, >=, <= 等)
        E_COL_TYPE col_type                  = 2;              ///< 列類型
        string col_name                      = 3;              ///< 列名
        repeated bytes col_values            = 4;              ///< 列值(當(dāng)且僅當(dāng)relation為IN時(shí)值的個(gè)數(shù)大于1有效)
        string col_name_right                = 5;              ///< 關(guān)系右邊列名(用于where col1=col2這種情況)
        enum E_RELATION
        {
            EQ                        = 0;              ///< 等于=
            NE                        = 1;              ///< 不等于!=
            GT                        = 2;              ///< 大于>
            LT                        = 3;              ///< 小于<
            GE                        = 4;              ///< 大于等于>=
            LE                        = 5;              ///< 小于等于<=
            LIKE                      = 6;              ///< like
            IN                        = 7;              ///< in (1, 2, 3, 4, 5)
        }
    }

    message ConditionGroup                              ///< where條件組合
    {
        E_RELATION relation                      = 1;           ///< 條件之間的關(guān)系,一個(gè)ConditionGroup里的所有Condition之間有且只有一種關(guān)系(and或者or)
        repeated Condition condition             = 2;           ///< 條件
        enum E_RELATION
        {
            AND                        = 0;             ///< and且
            OR                         = 1;             ///< or或
        }
    }

    message OrderBy
    {
        E_RELATION relation                      = 1;            ///< 降序或升序
        string col_name                          = 2;            ///< 列名
        enum E_RELATION
        {
            ASC                         = 0;
            DESC                        = 1;
        }
    }
}

}

enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}

message Record
{
repeated Field field_info = 1; ///< value data
}

message Field ///< 字段
{
string col_name = 1; ///< 列名
E_COL_TYPE col_type = 2; ///< 列類型
bytes col_value = 3; ///< 列值
string col_as = 4; ///< as列名
}

/**

  • @brief 查詢結(jié)果
  • @note 適用于Redis返回和MySQL返回,當(dāng)totalcount與curcount相等時(shí)表明數(shù)據(jù)已接收完畢,
  • 否則表示數(shù)據(jù)尚未接收完,剩余的數(shù)據(jù)會(huì)在后續(xù)數(shù)據(jù)包繼續(xù)返回。
    */
    message Result
    {
    int32 err_no = 1;
    bytes err_msg = 2;
    int32 total_count = 3;
    int32 current_count = 4;
    repeated Record record_data = 5;
    int32 from = 6; ///< 數(shù)據(jù)來源 E_RESULT_FROM
    DataLocate locate = 7; ///< 僅在DataProxy使用
    enum E_RESULT_FROM
    {
    FROM_DB = 0;
    FROM_REDIS = 1;
    }
    message DataLocate
    {
    uint32 section_from = 1;
    uint32 section_to = 2; ///< 數(shù)據(jù)所在分段,section_from < MemOperate.section_factor <= section_to
    uint32 hash = 3; ///< 用于做分布的hash值(取模運(yùn)算時(shí),為取模后的結(jié)果)
    uint32 divisor = 4; ///< 取模運(yùn)算的除數(shù)(一致性hash時(shí)不需要)
    }
    }

??這個(gè)協(xié)議分了Redis和MySQL兩部分?jǐn)?shù)據(jù),看似業(yè)務(wù)邏輯層把一份數(shù)據(jù)填充了兩份并沒有降低多少開發(fā)量,實(shí)際上這兩部分?jǐn)?shù)據(jù)有許多是可共用的,只要提供一個(gè)填充類就可以大幅降低協(xié)議填充開發(fā)量。為簡化協(xié)議填充,Nebula提供了幾個(gè)類:同時(shí)填充Redis和MySQL數(shù)據(jù)、只填充Redis、只填充MySQL。

??從Mydis協(xié)議的MySQL部分如何生成SQL語句請(qǐng)參考NebulaDbAgent,核心代碼頭文件如下:

```C++
namespace dbagent
{

const int gc_iMaxBeatTimeInterval = 30;
const int gc_iMaxColValueSize = 65535;

struct tagConnection
{
CMysqlDbi* pDbi;
time_t ullBeatTime;
int iQueryPermit;
int iTimeout;

tagConnection() : pDbi(NULL), ullBeatTime(0), iQueryPermit(0), iTimeout(0)
{
}

~tagConnection()
{
    if (pDbi != NULL)
    {
        delete pDbi;
        pDbi = NULL;
    }
}

};

class CmdExecSql : public neb::Cmd, public neb::DynamicCreator<CmdExecSql, int32>
{
public:
CmdExecSql(int32 iCmd);
virtual ~CmdExecSql();

virtual bool Init();

virtual bool AnyMessage(
                std::shared_ptr<neb::SocketChannel> pChannel,
                const MsgHead& oMsgHead,
                const MsgBody& oMsgBody);

protected:
bool GetDbConnection(const neb::Mydis& oQuery, CMysqlDbi ppMasterDbi, CMysqlDbi ppSlaveDbi);
bool FetchOrEstablishConnection(neb::Mydis::DbOperate::E_QUERY_TYPE eQueryType,
const std::string& strMasterIdentify, const std::string& strSlaveIdentify,
const neb::CJsonObject& oInstanceConf, CMysqlDbi ppMasterDbi, CMysqlDbi ppSlaveDbi);
std::string GetFullTableName(const std::string& strTableName, uint32 uiFactor);

int ConnectDb(const neb::CJsonObject& oInstanceConf, CMysqlDbi* pDbi, bool bIsMaster = true);
int Query(const neb::Mydis& oQuery, CMysqlDbi* pDbi);
void CheckConnection(); //檢查連接是否已超時(shí)
void Response(int iErrno, const std::string& strErrMsg);
bool Response(const neb::Result& oRsp);

bool CreateSql(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateSelect(const neb::Mydis& oQuery, std::string& strSql);
bool CreateInsert(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateUpdate(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateDelete(const neb::Mydis& oQuery, std::string& strSql);
bool CreateCondition(const neb::Mydis::DbOperate::Condition& oCondition, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateConditionGroup(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateGroupBy(const neb::Mydis& oQuery, std::string& strGroupBy);
bool CreateOrderBy(const neb::Mydis& oQuery, std::string& strOrderBy);
bool CreateLimit(const neb::Mydis& oQuery, std::string& strLimit);
bool CheckColName(const std::string& strColName);

private:
std::shared_ptr<neb::SocketChannel> m_pChannel;
MsgHead m_oInMsgHead;
MsgBody m_oInMsgBody;
int m_iConnectionTimeout; //空閑連接超時(shí)(單位秒)
char m_szColValue; //字段值
neb::CJsonObject m_oDbConf;
uint32 m_uiSectionFrom;
uint32 m_uiSectionTo;
uint32 m_uiHash;
uint32 m_uiDivisor;
std::map<std::string, std::set<uint32> > m_mapFactorSection; //分段因子區(qū)間配置,key為因子類型
std::map<std::string, neb::CJsonObject
> m_mapDbInstanceInfo; //數(shù)據(jù)庫配置信息key為("%u:%u:%u", uiDataType, uiFactor, uiFactorSection)
std::map<std::string, tagConnection*> m_mapDbiPool; //數(shù)據(jù)庫連接池,key為identify(如:192.168.18.22:3306)
};

} // namespace dbagent



??整個(gè)mydis數(shù)據(jù)協(xié)議是如何解析如何使用,如何做Redis和MySQL的數(shù)據(jù)雙寫、緩存數(shù)據(jù)回寫等不在本文討論范圍,如有興趣可以閱讀[NebulaMydis](https://github.com/Bwar/NebulaMydis)源碼,也可以聯(lián)系Bwar。

### 5. 結(jié)語

??Protobuf用得合適用得好可以解決許多問題,可以提高開發(fā)效率,也可以提高運(yùn)行效率,以上就是Bwar多年應(yīng)用protobuf的小結(jié),沒有任何藏私,文中列出的協(xié)議都可以在開源項(xiàng)目[Nebula](https://github.com/Bwar/Nebula)的這個(gè)路徑[https://github.com/Bwar/Nebula/tree/master/proto](https://github.com/Bwar/Nebula/tree/master/proto)找到。

??開發(fā)Nebula框架目的是致力于提供一種基于C\+\+快速構(gòu)建高性能的分布式服務(wù)。如果覺得本文對(duì)你有用,別忘了到Nebula的[__Github__](https://github.com/Bwar/Nebula)或[__碼云__](https://gitee.com/Bwar/Nebula)給個(gè)star,謝謝。
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI