您好,登錄后才能下訂單哦!
??Protobuf應(yīng)用廣泛,尤其作為網(wǎng)絡(luò)通訊協(xié)議最為普遍。本文將詳細(xì)描述幾個(gè)讓人眼前一亮的protobuf協(xié)議設(shè)計(jì),對(duì)準(zhǔn)備應(yīng)用或已經(jīng)應(yīng)用protobuf的開發(fā)者會(huì)有所啟發(fā),甚至可以直接拿過去用。 這里描述的協(xié)議設(shè)計(jì)被用于生產(chǎn)環(huán)境的即時(shí)通訊、埋點(diǎn)數(shù)據(jù)采集、消息推送、redis和mysql數(shù)據(jù)代理。
??Bwar從2013年開始應(yīng)用protobuf,2014年設(shè)計(jì)了用于mysql數(shù)據(jù)代理的protobuf協(xié)議,2015年設(shè)計(jì)了用于即時(shí)通訊的protobuf協(xié)議。高性能C++ IoC網(wǎng)絡(luò)框架Nebula https://github.com/Bwar/Nebula把這幾個(gè)protobuf協(xié)議設(shè)計(jì)應(yīng)用到了極致。
??本協(xié)議設(shè)計(jì)于2015年,用于一個(gè)生產(chǎn)環(huán)境的IM和埋點(diǎn)數(shù)據(jù)采集及實(shí)時(shí)分析,2016年又延伸發(fā)展了基于protobuf3的版本并用于開源網(wǎng)絡(luò)框架Nebula?;趐rotobuf2和protobuf3的有較少差別,這里分開講解兩個(gè)版本的協(xié)議設(shè)計(jì)。
??2015年尚無protobuf3的release版本,protobuf2版本的fixed32類型是固定占用4個(gè)字節(jié)的,非常適合用于網(wǎng)絡(luò)通訊協(xié)議設(shè)計(jì)。Bwar設(shè)計(jì)用于IM系統(tǒng)的協(xié)議包括兩個(gè)protobuf message:MsgHead和MsgBody,協(xié)議定義如下:
```C++
syntax = "proto2";
/**
/**
??解析收到的字節(jié)流時(shí)先解固定長度(15字節(jié))的MsgHead(protobuf3.0之后的版本必須在cmd、msgbody_len、seq均不為0的情況下才是15字節(jié)),再通過MsgHead里的msgbody_len判斷消息體是否接收完畢,若接收完畢則調(diào)用MsgBody.Parse()解析。MsgBody里的設(shè)計(jì)在下一節(jié)詳細(xì)說明。
??MsgHead在實(shí)際的項(xiàng)目應(yīng)用中對(duì)應(yīng)下面的消息頭并可以相互轉(zhuǎn)換:
```C++
#pragma pack(1)
/**
#pragma pack()
??轉(zhuǎn)換代碼如下:
```C++
E_CODEC_STATUS ClientMsgCodec::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, loss::CBuffer* pBuff)
{
tagClientMsgHead stClientMsgHead;
stClientMsgHead.version = 1; // version暫時(shí)無用
stClientMsgHead.encript = (unsigned char)(oMsgHead.cmd() >> 24);
stClientMsgHead.cmd = htons((unsigned short)(gc_uiCmdBit & oMsgHead.cmd()));
stClientMsgHead.body_len = htonl((unsigned int)oMsgHead.msgbody_len());
stClientMsgHead.seq = htonl(oMsgHead.seq());
stClientMsgHead.checksum = htons((unsigned short)stClientMsgHead.checksum);
...
}
E_CODEC_STATUS ClientMsgCodec::Decode(loss::CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
LOG4_TRACE("%s() pBuff->ReadableBytes() = %u", __FUNCTION__, pBuff->ReadableBytes());
size_t uiHeadSize = sizeof(tagClientMsgHead);
if (pBuff->ReadableBytes() >= uiHeadSize)
{
tagClientMsgHead stClientMsgHead;
int iReadIdx = pBuff->GetReadIndex();
pBuff->Read(&stClientMsgHead, uiHeadSize);
stClientMsgHead.cmd = ntohs(stClientMsgHead.cmd);
stClientMsgHead.body_len = ntohl(stClientMsgHead.body_len);
stClientMsgHead.seq = ntohl(stClientMsgHead.seq);
stClientMsgHead.checksum = ntohs(stClientMsgHead.checksum);
LOG4_TRACE("cmd %u, seq %u, len %u, pBuff->ReadableBytes() %u",
stClientMsgHead.cmd, stClientMsgHead.seq, stClientMsgHead.body_len,
pBuff->ReadableBytes());
oMsgHead.set_cmd(((unsigned int)stClientMsgHead.encript << 24) | stClientMsgHead.cmd);
oMsgHead.set_msgbody_len(stClientMsgHead.body_len);
oMsgHead.set_seq(stClientMsgHead.seq);
...
}
}
<br/>
??protobuf3版的MsgHead和MsgBody從IM業(yè)務(wù)應(yīng)用實(shí)踐中發(fā)展而來,同時(shí)滿足了埋點(diǎn)數(shù)據(jù)采集、實(shí)時(shí)計(jì)算、消息推送等業(yè)務(wù)需要,更為通用。正因其通用性和高擴(kuò)展性,采用proactor模型的IoC網(wǎng)絡(luò)框架Nebula才會(huì)選用這個(gè)協(xié)議,通過這個(gè)協(xié)議,框架層將網(wǎng)絡(luò)通信工作從業(yè)務(wù)應(yīng)用中完全獨(dú)立出來,基于Nebula框架的應(yīng)用開發(fā)者甚至可以不懂網(wǎng)絡(luò)編程也能開發(fā)出高并發(fā)的分布式服務(wù)。
??MsgHead和MsgBody的protobuf定義如下:
```C++
syntax = "proto3";
// import "google/protobuf/any.proto";
/**
/**
服務(wù)端接入路由,請(qǐng)求包必須填充。rsp_result是響應(yīng)結(jié)果,響應(yīng)包必須填充。
*/
message MsgBody
{
oneof msg_type
{
Request req_target = 1; ///< 請(qǐng)求目標(biāo)(請(qǐng)求包必須填充)
Response rsp_result = 2; ///< 響應(yīng)結(jié)果(響應(yīng)包必須填充)
}
bytes data = 3; ///< 消息體主體
bytes add_on = 4; ///< 服務(wù)端接入層附加在請(qǐng)求包的數(shù)據(jù)(客戶端無須理會(huì))
string trace_id = 5; ///< for log trace
message Request
{
uint32 route_id = 1; ///< 路由ID
string route = 2; ///< 路由ID(當(dāng)route_id用整型無法表達(dá)時(shí)使用)
}
message Response
{
int32 code = 1; ///< 錯(cuò)誤碼
bytes msg = 2; ///< 錯(cuò)誤信息
}
}
??MsgBody的data字段存儲(chǔ)消息主體,任何自定義數(shù)據(jù)均可以二進(jìn)制數(shù)據(jù)流方式寫入到data。
??msg_type用于標(biāo)識(shí)該消息是請(qǐng)求還是響應(yīng)(所有網(wǎng)絡(luò)數(shù)據(jù)流都可歸為請(qǐng)求或響應(yīng)),如果是請(qǐng)求,則可以選擇性填充Request里的route_id或route,如果填充了,則框架層無須解析應(yīng)用層協(xié)議(也無法解析)就能自動(dòng)根據(jù)路由ID轉(zhuǎn)發(fā),而無須應(yīng)用層解開data里的內(nèi)容再根據(jù)自定義邏輯轉(zhuǎn)發(fā)。如果是響應(yīng),則定義了統(tǒng)一的錯(cuò)誤標(biāo)準(zhǔn),也為業(yè)務(wù)無關(guān)的錯(cuò)誤處理提供方便。
??add_on是附在長連接上的業(yè)務(wù)數(shù)據(jù),框架并不會(huì)解析但會(huì)在每次轉(zhuǎn)發(fā)消息時(shí)帶上,可以為應(yīng)用提供極其方便且強(qiáng)大的功能。比如,IM用戶登錄時(shí)客戶端只發(fā)送用戶ID和密碼到服務(wù)端,服務(wù)端在登錄校驗(yàn)通過后,將該用戶的昵稱、頭像等信息通過框架提供的方法SetClientData()將數(shù)據(jù)附在服務(wù)端接入層該用戶對(duì)應(yīng)的長連接Channel上,之后所有從該連接過來的請(qǐng)求都會(huì)由框架層自動(dòng)填充add_on字段,服務(wù)端的其他邏輯服務(wù)器只從data中得到自定義業(yè)務(wù)邏輯(比如聊天消息)數(shù)據(jù),卻可以從add_on中得到這個(gè)發(fā)送用戶的信息。add_on的設(shè)計(jì)簡化了應(yīng)用開發(fā)邏輯,并降低了客戶端與服務(wù)端傳輸?shù)臄?shù)據(jù)量。
??trace_id用于分布式日志跟蹤。分布式服務(wù)的錯(cuò)誤定位是相當(dāng)麻煩的,Nebula分布式服務(wù)解決方案提供了日志跟蹤功能,協(xié)議里的trace_id字段的設(shè)計(jì)使得Nebula框架可以在完全不增加應(yīng)用開發(fā)者額外工作的情況下(正常調(diào)用LOG4_INFO寫日志而無須額外工作)實(shí)現(xiàn)所有標(biāo)記著同一trace_id的日志發(fā)送到指定一臺(tái)日志服務(wù)器,定義錯(cuò)誤時(shí)跟單體服務(wù)那樣登錄一臺(tái)服務(wù)器查看日志即可。比如,IM用戶發(fā)送一條消息失敗,在用戶發(fā)送的消息到達(dá)服務(wù)端接入層時(shí)就被打上了trace_id標(biāo)記,這個(gè)id會(huì)一直傳遞到邏輯層、存儲(chǔ)層等,哪個(gè)環(huán)節(jié)發(fā)生了錯(cuò)誤都可以從消息的發(fā)送、轉(zhuǎn)發(fā)、處理路徑上查到。
??MsgHead和MsgBody的編解碼實(shí)現(xiàn)見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp。
??上面的講解的是protobuf應(yīng)用于TCP數(shù)據(jù)流通信,接下來將描述protobuf在http通信上的應(yīng)用。
??在Web服務(wù)中通常會(huì)用Nginx做接入層的反向代理,經(jīng)過Nginx轉(zhuǎn)發(fā)到后續(xù)業(yè)務(wù)邏輯層的tomcat、apache或nginx上,接入層和業(yè)務(wù)邏輯層至少做了兩次http協(xié)議解析,http協(xié)議是文本協(xié)議,傳輸數(shù)據(jù)量大解析速度慢。Nebula框架不是一個(gè)web服務(wù)器,但支持http協(xié)議,在只需提供http接口的應(yīng)用場(chǎng)景(比如完全前后端分離的后端)基于Nebula的單進(jìn)程http服務(wù)端并發(fā)量就可以是tomcat的數(shù)十倍。這一定程度上得益于Nebula框架在http通信上protobuf的應(yīng)用。Nebula框架解析http文本協(xié)議并轉(zhuǎn)化為HttpMsg在服務(wù)內(nèi)部處理,應(yīng)用開發(fā)者填充HttpMsg,接入層將響應(yīng)的HttpMsg轉(zhuǎn)換成http文本協(xié)議發(fā)回給請(qǐng)求方,不管服務(wù)端內(nèi)部經(jīng)過多少次中轉(zhuǎn),始終只有一次http協(xié)議的decode和一次http協(xié)議的encode。
```C++
syntax = "proto3";
message HttpMsg
{
int32 type = 1; ///< http_parser_type 請(qǐng)求或響應(yīng)
int32 http_major = 2; ///< http大版本號(hào)
int32 http_minor = 3; ///< http小版本號(hào)
int32 content_length = 4; ///< 內(nèi)容長度
int32 method = 5; ///< 請(qǐng)求方法
int32 status_code = 6; ///< 響應(yīng)狀態(tài)碼
int32 encoding = 7; ///< 傳輸編碼(只在encode時(shí)使用,當(dāng) Transfer-Encoding: chunked 時(shí),用于標(biāo)識(shí)chunk序號(hào),0表示第一個(gè)chunk,依次遞增)
string url = 8; ///< 地址
map<string, string> headers = 9; ///< http頭域
bytes body = 10; ///< 消息體(當(dāng) Transfer-Encoding: chunked 時(shí),只存儲(chǔ)一個(gè)chunk)
map<string, string> params = 11; ///< GET方法參數(shù),POST方法表單提交的參數(shù)
Upgrade upgrade = 12; ///< 升級(jí)協(xié)議
float keep_alive = 13; ///< keep alive time
string path = 14; ///< Http Decode時(shí)從url中解析出來,不需要人為填充(encode時(shí)不需要填)
bool is_decoding = 15; ///< 是否正在解碼(true 正在解碼, false 未解碼或已完成解碼)
message Upgrade
{
bool is_upgrade = 1;
string protocol = 2;
}
}
??HttpMsg的編解碼實(shí)現(xiàn)見Nebula框架的[https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp](https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp)。
### 3. 數(shù)據(jù)庫代理服務(wù)協(xié)議設(shè)計(jì)
??如果上面描述的protobuf在網(wǎng)絡(luò)通信上應(yīng)用算不錯(cuò)的話,那以下將protobuf用于數(shù)據(jù)代理上的協(xié)議設(shè)計(jì)則絕對(duì)是讓人眼前一亮。
??有的公司規(guī)定web服務(wù)不得直接訪問MySQL數(shù)據(jù)庫,甚至不允許在web邏輯層拼接SQL語句。如果有這種出于安全性考慮而做的限制,在web邏輯層后面再增加一層業(yè)務(wù)邏輯層成本未免太高了,那么解決辦法應(yīng)該是增加一層業(yè)務(wù)邏輯無關(guān)的代理服務(wù)層。這個(gè)代理服務(wù)層不是簡單的轉(zhuǎn)發(fā)SQL語句這么簡單,因?yàn)閣eb邏輯層可能不允許拼接SQL,由此引出我們這個(gè)用于數(shù)據(jù)庫代理的protobuf協(xié)議設(shè)計(jì)。這個(gè)協(xié)議是將SQL邏輯融入整個(gè)協(xié)議之中,數(shù)據(jù)庫代理層接收并解析這個(gè)協(xié)議后生成SQL語句或用binding方式到數(shù)據(jù)庫去執(zhí)行。數(shù)據(jù)庫代理層只有協(xié)議解析和轉(zhuǎn)化邏輯,無其他任何業(yè)務(wù)邏輯,業(yè)務(wù)邏輯還在web邏輯層,區(qū)別只在于從拼接SQL變成了填充protobuf協(xié)議。
```C++
syntax = "proto2";
package dbagent;
/**
* @brief DB Agent消息
*/
message DbAgentMsg
{
enum E_TYPE
{
UNDEFINE = 0; ///< 未定義
REQ_CONNECT = 1; ///< 連接DB請(qǐng)求
RSP_CONNECT = 2; ///< 連接DB響應(yīng)
REQ_QUERY = 3; ///< 執(zhí)行SQL請(qǐng)求
RSP_QUERY = 4; ///< 執(zhí)行SQL響應(yīng)
REQ_DISCONNECT = 5; ///< 關(guān)閉連接請(qǐng)求
RSP_DISCONNECT = 6; ///< 關(guān)閉連接響應(yīng)
RSP_RECORD = 7; ///< 結(jié)果集記錄
RSP_COMMON = 8; ///< 通用響應(yīng)(當(dāng)請(qǐng)求不能被Server所認(rèn)知時(shí)會(huì)做出這個(gè)回應(yīng))
REQ_GET_CONNECT = 9; ///< 獲取連接請(qǐng)求
RSP_GET_CONNECT = 10; ///< 獲取連接響應(yīng)
}
required E_TYPE type = 1; ///< 消息/操作 類型
optional RequestConnect req_connect = 2; ///< 連接請(qǐng)求
optional ResponseConnect rsp_connect = 3; ///< 連接響應(yīng)
optional RequestDisconnect req_disconnect = 4; ///< 關(guān)閉請(qǐng)求
optional ResponseDisconnect rsp_disconnect = 5; ///< 關(guān)閉響應(yīng)
optional RequestQuery req_query = 6; ///< 執(zhí)行SQL請(qǐng)求
optional ResponseQuery rsp_query = 7; ///< 執(zhí)行SQL響應(yīng)
optional ResponseRecord rsp_record = 8; ///< SELECT結(jié)果集記錄
optional ResponseCommon rsp_common = 9; ///< 通用響應(yīng)
optional RequestGetConnection req_get_conn = 10; ///< 獲取連接請(qǐng)求
optional ResponseGetConnection rsp_get_conn = 11; ///< 獲取連接響應(yīng)
}
/**
* @brief 連接請(qǐng)求
*/
message RequestConnect
{
required string host = 1; ///< DB所在服務(wù)器IP
required int32 port = 2; ///< DB端口
required string user = 3; ///< DB用戶名
required string password = 4; ///< DB用戶密碼
required string dbname = 5; ///< DB庫名
required string charset = 6; ///< DB字符集
}
/**
* @brief 連接響應(yīng)
*/
message ResponseConnect
{
required int32 connect_id = 1; ///< 連接ID (連接失敗時(shí),connect_id為0)
optional int32 err_no = 2; ///< 錯(cuò)誤碼 0 表示連接成功
optional string err_msg = 3; ///< 錯(cuò)誤信息
}
/**
* @brief 關(guān)閉連接請(qǐng)求
*/
message RequestDisconnect
{
required int32 connect_id = 1; ///< 連接ID (連接失敗時(shí),connect_id為0)
}
/**
* @brief 關(guān)閉連接響應(yīng)
*/
message ResponseDisconnect
{
optional int32 err_no = 2; ///< 錯(cuò)誤碼 0 表示連接成功
optional string err_msg = 3; ///< 錯(cuò)誤信息
}
/**
* @brief 執(zhí)行SQL請(qǐng)求
*/
message RequestQuery
{
required E_QUERY_TYPE query_type = 1; ///< 查詢類型
required string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列類型
repeated ConditionGroup conditions= 4; ///< where條件組(由group_relation指定,若不指定則默認(rèn)為AND關(guān)系)
repeated string groupby_col = 5; ///< group by字段
repeated OrderBy orderby_col = 6; ///< order by字段
optional uint32 limit = 7; ///< 指定返回的行數(shù)的最大值 (limit 200)
optional uint32 limit_from = 8; ///< 指定返回的第一行的偏移量 (limit 100, 200)
optional ConditionGroup.E_RELATION group_relation = 9; ///< where條件組的關(guān)系,條件組之間有且只有一種關(guān)系(and或者or)
optional int32 connect_id = 10; ///< 連接ID,有效連接ID(長連接,當(dāng)connect后多次執(zhí)行query可以使用connect_id)
optional string bid = 11; ///< 業(yè)務(wù)ID,在CmdDbAgent.json配置文件中配置(短連接,每次執(zhí)行query時(shí)連接,執(zhí)行完后關(guān)閉連接)
optional string password = 12; ///< 業(yè)務(wù)密碼
enum E_QUERY_TYPE ///< 查詢類型
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Field ///< 字段
{
required string col_name = 1; ///< 列名
required E_COL_TYPE col_type = 2; ///< 列類型
required bytes col_value = 3; ///< 列值
optional string col_as = 4; ///< as列名
}
message Condition ///< where條件
{
required E_RELATION relation = 1; ///< 關(guān)系(=, !=, >, <, >=, <= 等)
required E_COL_TYPE col_type = 2; ///< 列類型
required string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當(dāng)且僅當(dāng)relation為IN時(shí)值的個(gè)數(shù)大于1有效)
optional string col_name_right= 5; ///< 關(guān)系右邊列名(用于where col1=col2這種情況)
enum E_RELATION
{
EQ = 0; ///< 等于=
NE = 1; ///< 不等于!=
GT = 2; ///< 大于>
LT = 3; ///< 小于<
GE = 4; ///< 大于等于>=
LE = 5; ///< 小于等于<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
required E_RELATION relation = 1; ///< 條件之間的關(guān)系,一個(gè)ConditionGroup里的所有Condition之間有且只有一種關(guān)系(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
required E_RELATION relation = 1; ///< 降序或升序
required string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
/**
* @brief 執(zhí)行SQL響應(yīng)
*/
message ResponseQuery
{
required uint32 seq = 1; ///< 數(shù)據(jù)包序列號(hào)(SELECT結(jié)果集會(huì)分包返回,只有一個(gè)包的情況或已到達(dá)最后一個(gè)包則seq=0xFFFFFFFF)
required int32 err_no = 2; ///< 錯(cuò)誤碼,0 表示執(zhí)行成功
optional string err_msg = 3; ///< 錯(cuò)誤信息
optional uint64 insert_id = 4; ///< mysql_insert_id()獲取的值(視執(zhí)行的SQL語句而定,不一定存在)
repeated bytes dict = 5; ///< 結(jié)果集字典(視執(zhí)行的SQL語句而定,不一定存在)
}
/**
* @brief SELECT語句返回結(jié)果集的一條記錄
*/
message ResponseRecord
{
required uint32 seq = 1; ///< 數(shù)據(jù)包序列號(hào)(SELECT結(jié)果集會(huì)分包返回,已到達(dá)最后一個(gè)包則seq=0xFFFFFFFF)
repeated bytes field = 2; ///< 數(shù)據(jù)集記錄的字段
}
/**
* @brief 常規(guī)響應(yīng)
*/
message ResponseCommon
{
optional int32 err_no = 1; ///< 錯(cuò)誤碼 0 表示連接成功
optional string err_msg = 2; ///< 錯(cuò)誤信息
}
/**
* @brief 獲取連接請(qǐng)求
*/
message RequestGetConnection
{
required string bid = 1; ///< 業(yè)務(wù)ID,在dbproxy配置文件中配置
required string password = 2; ///< 業(yè)務(wù)密碼
}
/**
* @brief 獲取連接響應(yīng)
*/
message ResponseGetConnection
{
required int32 connect_id = 1; ///< 連接ID,有效連接ID,否則執(zhí)行失敗
optional int32 err_no = 2; ///< 錯(cuò)誤碼 0 表示連接成功
optional string err_msg = 3; ///< 錯(cuò)誤信息
}
??基于這個(gè)數(shù)據(jù)庫操作協(xié)議開發(fā)的數(shù)據(jù)庫代理層完全解決了web邏輯層不允許直接訪問數(shù)據(jù)庫也不允許拼接SQL語句的問題,而且?guī)缀鯖]有增加開發(fā)代價(jià)。另外,基于這個(gè)協(xié)議的數(shù)據(jù)庫代理天然防止SQL注入(在代理層校驗(yàn)field_name,并且mysql_escape_string(filed_value)),雖然防SQL注入應(yīng)是應(yīng)用層的責(zé)任,但多了數(shù)據(jù)代理這層保障也是好事。
??這個(gè)協(xié)議只支持簡單SQL,不支持聯(lián)合查詢、子查詢,也不支持存儲(chǔ)過程,如果需要支持的話協(xié)議會(huì)更復(fù)雜。在Bwar所負(fù)責(zé)過的業(yè)務(wù)里,基本都禁止數(shù)據(jù)庫聯(lián)合查詢之類,只把數(shù)據(jù)庫當(dāng)存儲(chǔ)用,不把邏輯寫到SQL語句里,所以這個(gè)協(xié)議滿足大部分業(yè)務(wù)需要。
??這一節(jié)只說明數(shù)據(jù)庫代理協(xié)議,下一節(jié)將從數(shù)據(jù)庫代理協(xié)議延伸并提供協(xié)議代碼講解。
??大部分后臺(tái)應(yīng)用只有MySQL是不夠的,往往還需要緩存,經(jīng)常會(huì)用Redis來做數(shù)據(jù)緩存。用緩存意味著數(shù)據(jù)至少需要同時(shí)寫到Redis和MySQL,又或者在未命中緩存時(shí)從MySQL中讀取到的數(shù)據(jù)需要回寫到Redis,這些通常都是由業(yè)務(wù)邏輯層來做的。也有例外,Nebula提供的分布式解決方案是由數(shù)據(jù)代理層來做的,業(yè)務(wù)邏輯層只需向數(shù)據(jù)代理層發(fā)送一個(gè)protobuf協(xié)議數(shù)據(jù),數(shù)據(jù)代理層就會(huì)完成Redis和MySQL雙寫或緩存未命中時(shí)的自動(dòng)回寫(暫且不探討數(shù)據(jù)一致性問題)。數(shù)據(jù)代理層來做這些工作是為了減少業(yè)務(wù)邏輯層的復(fù)雜度,提高開發(fā)效率。既然是為了提高開發(fā)效率,就得讓業(yè)務(wù)邏輯層低于原來同時(shí)操作Redis和MySQL的開發(fā)量。Nebula提供的NebulaMydis就是這樣一個(gè)讓原來同時(shí)操作Redis和MySQL的開發(fā)量(假設(shè)是2)降到1.2左右。
??這個(gè)同時(shí)操作Redis和MySQL的數(shù)據(jù)代理協(xié)議如下:
```C++
syntax = "proto3";
package neb;
message Mydis
{
uint32 section_factor = 1;
RedisOperate redis_operate = 2;
DbOperate db_operate = 3;
message RedisOperate
{
bytes key_name = 1;
string redis_cmd_read = 2;
string redis_cmd_write = 3;
OPERATE_TYPE op_type = 4;
repeated Field fields = 5;
int32 key_ttl = 6;
int32 redis_structure = 7; ///< redis數(shù)據(jù)類型
int32 data_purpose = 8; ///< 數(shù)據(jù)用途
bytes hash_key = 9; ///< 可選hash key,當(dāng)has_hash_key()時(shí)用hash_key來計(jì)算hash值,否則用key_name來計(jì)算hash值
enum OPERATE_TYPE
{
T_READ = 0;
T_WRITE = 1;
}
}
message DbOperate
{
E_QUERY_TYPE query_type = 1; ///< 查詢類型
string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列類型
repeated ConditionGroup conditions = 4; ///< where條件組(由group_relation指定,若不指定則默認(rèn)為AND關(guān)系)
repeated string groupby_col = 5; ///< group by字段
repeated OrderBy orderby_col = 6; ///< order by字段
ConditionGroup.E_RELATION group_relation = 7; ///< where條件組的關(guān)系,條件組之間有且只有一種關(guān)系(and或者or)
uint32 limit = 8; ///< 指定返回的行數(shù)的最大值 (limit 200)
uint32 limit_from = 9; ///< 指定返回的第一行的偏移量 (limit 100, 200)
uint32 mod_factor = 10; ///< 分表取模因子,當(dāng)這個(gè)字段沒有時(shí)使用section_factor
enum E_QUERY_TYPE ///< 查詢類型
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
message Condition ///< where條件
{
E_RELATION relation = 1; ///< 關(guān)系(=, !=, >, <, >=, <= 等)
E_COL_TYPE col_type = 2; ///< 列類型
string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當(dāng)且僅當(dāng)relation為IN時(shí)值的個(gè)數(shù)大于1有效)
string col_name_right = 5; ///< 關(guān)系右邊列名(用于where col1=col2這種情況)
enum E_RELATION
{
EQ = 0; ///< 等于=
NE = 1; ///< 不等于!=
GT = 2; ///< 大于>
LT = 3; ///< 小于<
GE = 4; ///< 大于等于>=
LE = 5; ///< 小于等于<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
E_RELATION relation = 1; ///< 條件之間的關(guān)系,一個(gè)ConditionGroup里的所有Condition之間有且只有一種關(guān)系(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
E_RELATION relation = 1; ///< 降序或升序
string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
}
enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Record
{
repeated Field field_info = 1; ///< value data
}
message Field ///< 字段
{
string col_name = 1; ///< 列名
E_COL_TYPE col_type = 2; ///< 列類型
bytes col_value = 3; ///< 列值
string col_as = 4; ///< as列名
}
/**
??這個(gè)協(xié)議分了Redis和MySQL兩部分?jǐn)?shù)據(jù),看似業(yè)務(wù)邏輯層把一份數(shù)據(jù)填充了兩份并沒有降低多少開發(fā)量,實(shí)際上這兩部分?jǐn)?shù)據(jù)有許多是可共用的,只要提供一個(gè)填充類就可以大幅降低協(xié)議填充開發(fā)量。為簡化協(xié)議填充,Nebula提供了幾個(gè)類:同時(shí)填充Redis和MySQL數(shù)據(jù)、只填充Redis、只填充MySQL。
??從Mydis協(xié)議的MySQL部分如何生成SQL語句請(qǐng)參考NebulaDbAgent,核心代碼頭文件如下:
```C++
namespace dbagent
{
const int gc_iMaxBeatTimeInterval = 30;
const int gc_iMaxColValueSize = 65535;
struct tagConnection
{
CMysqlDbi* pDbi;
time_t ullBeatTime;
int iQueryPermit;
int iTimeout;
tagConnection() : pDbi(NULL), ullBeatTime(0), iQueryPermit(0), iTimeout(0)
{
}
~tagConnection()
{
if (pDbi != NULL)
{
delete pDbi;
pDbi = NULL;
}
}
};
class CmdExecSql : public neb::Cmd, public neb::DynamicCreator<CmdExecSql, int32>
{
public:
CmdExecSql(int32 iCmd);
virtual ~CmdExecSql();
virtual bool Init();
virtual bool AnyMessage(
std::shared_ptr<neb::SocketChannel> pChannel,
const MsgHead& oMsgHead,
const MsgBody& oMsgBody);
protected:
bool GetDbConnection(const neb::Mydis& oQuery, CMysqlDbi ppMasterDbi, CMysqlDbi ppSlaveDbi);
bool FetchOrEstablishConnection(neb::Mydis::DbOperate::E_QUERY_TYPE eQueryType,
const std::string& strMasterIdentify, const std::string& strSlaveIdentify,
const neb::CJsonObject& oInstanceConf, CMysqlDbi ppMasterDbi, CMysqlDbi ppSlaveDbi);
std::string GetFullTableName(const std::string& strTableName, uint32 uiFactor);
int ConnectDb(const neb::CJsonObject& oInstanceConf, CMysqlDbi* pDbi, bool bIsMaster = true);
int Query(const neb::Mydis& oQuery, CMysqlDbi* pDbi);
void CheckConnection(); //檢查連接是否已超時(shí)
void Response(int iErrno, const std::string& strErrMsg);
bool Response(const neb::Result& oRsp);
bool CreateSql(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateSelect(const neb::Mydis& oQuery, std::string& strSql);
bool CreateInsert(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateUpdate(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateDelete(const neb::Mydis& oQuery, std::string& strSql);
bool CreateCondition(const neb::Mydis::DbOperate::Condition& oCondition, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateConditionGroup(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateGroupBy(const neb::Mydis& oQuery, std::string& strGroupBy);
bool CreateOrderBy(const neb::Mydis& oQuery, std::string& strOrderBy);
bool CreateLimit(const neb::Mydis& oQuery, std::string& strLimit);
bool CheckColName(const std::string& strColName);
private:
std::shared_ptr<neb::SocketChannel> m_pChannel;
MsgHead m_oInMsgHead;
MsgBody m_oInMsgBody;
int m_iConnectionTimeout; //空閑連接超時(shí)(單位秒)
char m_szColValue; //字段值
neb::CJsonObject m_oDbConf;
uint32 m_uiSectionFrom;
uint32 m_uiSectionTo;
uint32 m_uiHash;
uint32 m_uiDivisor;
std::map<std::string, std::set<uint32> > m_mapFactorSection; //分段因子區(qū)間配置,key為因子類型
std::map<std::string, neb::CJsonObject> m_mapDbInstanceInfo; //數(shù)據(jù)庫配置信息key為("%u:%u:%u", uiDataType, uiFactor, uiFactorSection)
std::map<std::string, tagConnection*> m_mapDbiPool; //數(shù)據(jù)庫連接池,key為identify(如:192.168.18.22:3306)
};
} // namespace dbagent
??整個(gè)mydis數(shù)據(jù)協(xié)議是如何解析如何使用,如何做Redis和MySQL的數(shù)據(jù)雙寫、緩存數(shù)據(jù)回寫等不在本文討論范圍,如有興趣可以閱讀[NebulaMydis](https://github.com/Bwar/NebulaMydis)源碼,也可以聯(lián)系Bwar。
### 5. 結(jié)語
??Protobuf用得合適用得好可以解決許多問題,可以提高開發(fā)效率,也可以提高運(yùn)行效率,以上就是Bwar多年應(yīng)用protobuf的小結(jié),沒有任何藏私,文中列出的協(xié)議都可以在開源項(xiàng)目[Nebula](https://github.com/Bwar/Nebula)的這個(gè)路徑[https://github.com/Bwar/Nebula/tree/master/proto](https://github.com/Bwar/Nebula/tree/master/proto)找到。
??開發(fā)Nebula框架目的是致力于提供一種基于C\+\+快速構(gòu)建高性能的分布式服務(wù)。如果覺得本文對(duì)你有用,別忘了到Nebula的[__Github__](https://github.com/Bwar/Nebula)或[__碼云__](https://gitee.com/Bwar/Nebula)給個(gè)star,謝謝。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。