您好,登錄后才能下訂單哦!
這篇文章主要介紹了怎么用Python編寫簡單的gRPC服務(wù),具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
gRPC 是可以在任何環(huán)境中運行的現(xiàn)代開源高性能 RPC 框架。它可以通過可插拔的支持來有效地連接數(shù)據(jù)中心內(nèi)和跨數(shù)據(jù)中心的服務(wù),以實現(xiàn)負(fù)載平衡,跟蹤,運行狀況檢查和身份驗證。它也適用于分布式計算的最后一英里,以將設(shè)備,移動應(yīng)用程序和瀏覽器連接到后端服務(wù)。
grpc官網(wǎng)python參考:https://www.grpc.io/docs/languages/python/quickstart/
http://grpc.github.io/grpc/python/grpc.html
Python 3.5 or higher
pip version 9.0.1 or higher
安裝gRPC相關(guān)的庫
grpcio-tools主要用根據(jù)我們的protocol buffer定義來生成Python代碼,官方解釋是Protobuf code generator for gRPC。
#apt install python3-pip pip install grpcio pip install protobuf pip install grpcio_tools
proto是一個協(xié)議文件,客戶端和服務(wù)器的通信接口正是通過proto文件協(xié)定的,可以根據(jù)不同語言生成對應(yīng)語言的代碼文件。
heartbeat.proto文件:
syntax = "proto3"; message HeartbeatRequest{ string Host = 1; int32 Mem = 2; int32 Disk = 3; int32 Cpu = 4; int64 Timestamp = 5; int64 Seq = 6; } message HeartbeatResponse{ int32 ErrCode = 1; string ErrMsg = 2; }
heartbeat_service.proto
syntax = "proto3"; import "heartbeat.proto"; // HeartBeatService service HeartBeatService{ rpc HeartBeat(HeartbeatRequest) returns(HeartbeatResponse){} }
核心 就是一個 用于生成需要用到數(shù)據(jù)類型的文件;一個就是用于生成相關(guān)調(diào)用方法的類。 一個定義數(shù)據(jù)類型,一個用于定義方法。
proto文件需要通過protoc生成對應(yīng)的.py文件。protoc的下載地址 。下載解壓之后,將解壓目錄添加到path的環(huán)境變量中。
pip install grpcio install grpcio-tools #pip install --upgrade protobuf
注意:【下面命令是在proto文件所在的目錄執(zhí)行的,-I 用來指定proto的目錄是 . 】
python -m grpc_tools.protoc -I=. --python_out=.. heartbeat.proto python -m grpc_tools.protoc -I=. --grpc_python_out=.. heartbeat_service.proto
-I 指定proto所在目錄
-m 指定通過protoc生成py文件
–python_out生成py文件的輸出路徑
heartbeat.proto、heartbeat_service.proto為 輸入的proto文件
生成的文件名中 xxx_pb2.py 就是我們剛才創(chuàng)建數(shù)據(jù)結(jié)構(gòu)文件,里面有定義函數(shù)參數(shù)和返回數(shù)據(jù)結(jié)構(gòu); xxx_pb2_grpc.py 就是我們定義的函數(shù),定義了我們客服端rpc將來要調(diào)用方法。
服務(wù)端
#!/usr/bin/env python # coding=utf-8 import sys from concurrent import futures import time import grpc from google.protobuf.json_format import MessageToJson import heartbeat_service_pb2_grpc import heartbeat_pb2 from lib.core.log import LOGGER class HeartBeatSrv(heartbeat_service_pb2_grpc.HeartBeatServiceServicer): def HeartBeat(self, msg, context): try: # LOGGER.info(MessageToJson(msg, preserving_proto_field_name=True)) body = MessageToJson(msg, preserving_proto_field_name=True) LOGGER.info("Get Heartbeat Request: %s", body) response = heartbeat_pb2.HeartbeatResponse() response.ErrCode = 0000 response.ErrMsg = "success" return response except Exception as e: print("exception in heartbeat") LOGGER.error("RPC Service exception: %s", e) response = heartbeat_pb2.HeartbeatResponse() response.ErrCode = 500 response.ErrMsg = "rpc error: %s" % e return response def server(host, rpc_port): # 這里通過thread pool來并發(fā)處理server的任務(wù) # 定義服務(wù)器并設(shè)置最大連接數(shù),concurrent.futures是一個并發(fā)庫,類似于線程池的概念 grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100)) # 不使用SSL grpc_server.add_insecure_port('[::]' + ':' + str(rpc_port)) # This method is only safe to call before the server is started. # 綁定處理器HeartBeatSrv(自己實現(xiàn)了處理函數(shù)) heartbeat_service_pb2_grpc.add_HeartBeatServiceServicer_to_server(HeartBeatSrv(), grpc_server) # 該方法只能調(diào)用一次, start() 不會阻塞 # 啟動服務(wù)器 grpc_server.start() LOGGER.info("server start...") while 1: time.sleep(10) #grpc_server.wait_for_termination() def main(): try: LOGGER.info("begin start server") rpc_port = 8090 host = "::" server(host, rpc_port) except Exception as e: LOGGER.error("server start error: %s", e) time.sleep(5) if __name__ == '__main__': LOGGER.info(sys.path) main()
客戶端
from time import sleep import grpc import heartbeat_pb2 import heartbeat_service_pb2_grpc from lib.core.log import LOGGER def run(seq): option = [('grpc.keepalive_timeout_ms', 10000)] # with grpc.insecure_channel(target='127.0.0.1:8090', options=option) as channel: # 客戶端實例 stub = heartbeat_service_pb2_grpc.HeartBeatServiceStub(channel) # stub調(diào)用服務(wù)端方法 response = stub.HeartBeat(heartbeat_pb2.HeartbeatRequest(Host='hello grpc', Seq=seq), timeout=10) LOGGER.info("response ErrCode:%s", response.ErrCode) if __name__ == '__main__': for i in range(1, 10000): LOGGER.info("i: %s", i) sleep(3) run(i)
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“怎么用Python編寫簡單的gRPC服務(wù)”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。