溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Golang如何實(shí)現(xiàn)簡易的rpc調(diào)用

發(fā)布時間:2023-03-06 16:35:36 來源:億速云 閱讀:98 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“Golang如何實(shí)現(xiàn)簡易的rpc調(diào)用”,在日常操作中,相信很多人在Golang如何實(shí)現(xiàn)簡易的rpc調(diào)用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Golang如何實(shí)現(xiàn)簡易的rpc調(diào)用”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

    RPC(Remote Procedure Call Protocol)遠(yuǎn)程過程調(diào)用協(xié)議。 一個通俗的描述是:客戶端在不知道調(diào)用細(xì)節(jié)的情況下,調(diào)用存在于遠(yuǎn)程計算機(jī)上的某個對象,就像調(diào)用本地應(yīng)用程序中的對象一樣。 比較正式的描述是:一種通過網(wǎng)絡(luò)從遠(yuǎn)程計算機(jī)程序上請求服務(wù),而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議 從使用的方面來說,服務(wù)端和客戶端通過TCP/UDP/HTTP等通訊協(xié)議通訊,在通訊的時候客戶端指定好服務(wù)端的方法、參數(shù)等信息通過序列化傳送到服務(wù)端,服務(wù)端可以通過已有的元信息找到需要調(diào)用的方法,然后完成一次調(diào)用后序列化返回給客戶端(rpc更多的是指服務(wù)與服務(wù)之間的通信,可以使用效率更高的協(xié)議和序列化格式去進(jìn)行,并且可以進(jìn)行有效的負(fù)載均衡和熔斷超時等,因此跟前后端之間的web的交互概念上是有點(diǎn)不一樣的) 用一張簡單的圖來表示

    Golang如何實(shí)現(xiàn)簡易的rpc調(diào)用

    開始

    本文只實(shí)現(xiàn)一個rpc框架基本的功能,不對性能做保證,因此盡量使用go原生自帶的net/json庫等進(jìn)行操作,對使用方面不做stub(偷懶,只使用簡單的json格式指定需要調(diào)用的方法),用最簡單的方式實(shí)現(xiàn)一個簡易rpc框架,也不保證超時調(diào)用和服務(wù)發(fā)現(xiàn)等集成的邏輯。

    實(shí)現(xiàn)兩點(diǎn)之間的通訊(transport)

    本段先實(shí)現(xiàn)兩端之間的通訊,只確保兩個端之間能互相通訊即可 server.go

    package server
    
    import (
    	"fmt"
    	"log"
    	"net"
    )
    
    // Server: transport底層實(shí)現(xiàn),通過Server去接受客戶端的字節(jié)流
    type Server struct {
    	ls   net.Listener
    	port int
    }
    
    // NewServer: 根據(jù)端口創(chuàng)建一個server
    func NewServer(port int) *Server {
    	s := &Server{port: port}
    	s.init()
    	return s
    }
    
    // init: 初始化服務(wù)端連接
    func (s *Server) init() {
    	l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.port))
    	if err != nil {
    		panic(err)
    	}
    	s.ls = l
    }
    
    // Start: 啟動服務(wù)端的端口監(jiān)聽,采取一個conn一個g的模型,沒有使用reactor等高性能模型
    func (s *Server) Start() {
    	go func() {
    		log.Printf("server [%s] start....", s.ls.Addr().String())
    		for {
    			conn, err := s.ls.Accept()
    			if err != nil {
    				panic(err)
    			}
    			go func() {
    				buf := make([]byte, 1024)
    				for {
    					idx, err := conn.Read(buf)
    					if err != nil {
    						panic(err)
    					}
    					if len(buf) == 0 {
    						continue
    					}
    					// todo 等序列化的信息
    					log.Printf("[conn: %v] get data: %v\n", conn.RemoteAddr(), string(buf[:idx]))
    
    				}
    			}()
    		}
    	}()
    
    }
    
    // Close: 關(guān)閉服務(wù)監(jiān)聽
    func (s *Server) Close() error {
    	return s.ls.Close()
    }
    
    
    // Close: 關(guān)閉服務(wù)監(jiān)聽
    func (s *Server) Close() error {
    	return s.ls.Close()
    }

    client.go

    package client
    
    import (
    	"fmt"
    	"log"
    	"net"
    	"unsafe"
    )
    
    type Client struct {
    	port int
    	conn net.Conn
    }
    
    func NewClient(port int) *Client {
    	c := &Client{port: port}
    	c.init()
    	return c
    }
    
    // init: initialize tcp client
    func (c *Client) init() {
    	conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", c.port))
    	if err != nil {
    		panic(err)
    	}
    	c.conn = conn
    
    }
    
    func (c *Client) Send(statement string) error {
    	_, err := c.conn.Write(*(*[]byte)(unsafe.Pointer(&statement)))
    	if err != nil {
    		panic(err)
    	}
    	return nil
    }
    
    // Close: use to close connection
    func (c *Client) Close() error {
    	return c.conn.Close()
    }

    使用main.go做測試 main.go

    package main
    
    import (
    	"rpc_demo/internal/client"
    	"rpc_demo/internal/server"
    	"time"
    )
    
    func main() {
    	s := server.NewServer(9999)
    	s.Start()
    	time.Sleep(5 * time.Second)
    	c := client.NewClient(9999)
    	c.Send("this is a test\n")
    	time.Sleep(5 * time.Second)
    }

    執(zhí)行一次main.go, go run main.go

    2023/03/05 14:39:11 server [127.0.0.1:9999] start....
    2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test

    可以證明第一部分的任務(wù)已經(jīng)完成,可以實(shí)現(xiàn)兩端之間的通訊了

    實(shí)現(xiàn)反射調(diào)用已注冊的方法

    實(shí)現(xiàn)了雙端的通信以后,我們在internal.go里實(shí)現(xiàn)兩個方法,一個是注冊,一個是調(diào)用,因?yàn)間o有運(yùn)行時的反射,所以我們使用反射去注冊每一個需要調(diào)用到的方法,然后提供全局唯一的函數(shù)名,讓client端可以實(shí)現(xiàn)指定方法的調(diào)用

    internal.go

    package internal
    
    import (
    	"errors"
    	"fmt"
    	"reflect"
    	"runtime"
    	"strings"
    )
    
    // 全局唯一
    var GlobalMethod = &Method{methods: map[string]reflect.Value{}}
    
    type Method struct {
    	methods map[string]reflect.Value
    }
    
    func (m *Method) register(impl interface{}) error {
    	pl := reflect.ValueOf(impl)
    	if pl.Kind() != reflect.Func {
    		return errors.New("impl should be function")
    	}
    	// 獲取函數(shù)名
    	methodName := runtime.FuncForPC(pl.Pointer()).Name()
    	if len(strings.Split(methodName, ".")) < 1 {
    		return errors.New("invalid function name")
    	}
    	lastFuncName := strings.Split(methodName, ".")[1]
    	m.methods[lastFuncName] = pl
    	fmt.Printf("methods: %v\n", m.methods)
    	return nil
    }
    
    func (m *Method) call(methodName string, callParams ...interface{}) ([]interface{}, error) {
    	fn, ok := m.methods[methodName]
    	if !ok {
    		return nil, errors.New("impl method not found! Please Register first")
    	}
    	in := make([]reflect.Value, len(callParams))
    	for i := 0; i < len(callParams); i++ {
    		in[i] = reflect.ValueOf(callParams[i])
    	}
    	res := fn.Call(in)
    	out := make([]interface{}, len(res))
    	for i := 0; i < len(res); i++ {
    		out[i] = res[i].Interface()
    	}
    	return out, nil
    }
    
    func Call(methodName string, callParams ...interface{}) ([]interface{}, error) {
    	return GlobalMethod.call(methodName, callParams...)
    }
    
    func Register(impl interface{}) error {
    	return GlobalMethod.register(impl)
    }

    在單測里測試一下這個注冊和調(diào)用的功能internal_test.go

    package internal
    
    import (
    	"testing"
    )
    
    func Sum(a, b int) int {
    	return a + b
    }
    func TestRegister(t *testing.T) {
    	err := Register(Sum)
    	if err != nil {
    		t.Fatalf("err: %v\n", err)
    	}
    	t.Logf("test success\n")
    }
    
    func TestCall(t *testing.T) {
    	TestRegister(t)
    	result, err := Call("Sum", 1, 2)
    	if err != nil {
    		t.Fatalf("err: %v\n", err)
    	}
    	if len(result) != 1 {
    		t.Fatalf("len(result) is not equal to 1\n")
    	}
    	t.Logf("Sum(1,2) = %d\n", result[0].(int))
    	if err := recover(); err != nil {
    		t.Fatalf("%v\n", err)
    	}
    }

    執(zhí)行調(diào)用

    /usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v

    Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v

    === RUN   TestCall
    methods: map[Sum:<func(int, int) int Value>]
        /root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:15: test success
        /root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:27: Sum(1,2) = 3
    --- PASS: TestCall (0.00s)
    PASS
    ok      rpc_demo/internal    0.002s

    可以看到這個注冊和調(diào)用的過程已經(jīng)實(shí)現(xiàn)并且達(dá)到指定方法調(diào)用的作用

    設(shè)計struct完整表達(dá)一次完整的rpc調(diào)用,并且封裝json庫中的Decoder和Encoder,完成序列化和反序列化

    internal.go

    type RpcRequest struct {
    	MethodName string
    	Params     []interface{}
    }
    
    type RpcResponses struct {
    	Returns []interface{}
    	Err     error
    }

    transport.go考慮可以對接更多的格式,所以抽象了一層進(jìn)行使用(demo肯定沒有更多格式了)

    package transport
    
    // Transport: 序列化格式的抽象層,從connection中讀取數(shù)據(jù)序列化并且反序列化到connection中
    type Transport interface {
    	Decode(v interface{}) error
    	Encode(v interface{}) error
    	Close()
    }

    json_transport.go

    package transport
    
    import (
    	"encoding/json"
    	"net"
    )
    
    var _ Transport = (*JSONTransport)(nil)
    
    type JSONTransport struct {
    	encoder *json.Encoder
    	decoder *json.Decoder
    }
    
    // NewJSONTransport: 負(fù)責(zé)讀取和寫入conn
    func NewJSONTransport(conn net.Conn) *JSONTransport {
    	return &JSONTransport{json.NewEncoder(conn), json.NewDecoder(conn)}
    }
    
    // Decode: use json package to decode
    func (t *JSONTransport) Decode(v interface{}) error {
    	if err := t.decoder.Decode(v); err != nil {
    		return err
    	}
    	return nil
    }
    
    // Encode: use json package to encode
    func (t *JSONTransport) Encode(v interface{}) error {
    	if err := t.encoder.Encode(v); err != nil {
    		return err
    	}
    	return nil
    }
    
    // Close: not implement
    func (dec *JSONTransport) Close() {
    
    }

    然后我們將服務(wù)端和客戶端的邏輯進(jìn)行修改,改成通過上面兩個結(jié)構(gòu)體進(jìn)行通信,然后返回一次調(diào)用 server.go

    //...
    		for {
    			conn, err := s.ls.Accept()
    			if err != nil {
    				panic(err)
    			}
    			tsp := transport.NewJSONTransport(conn)
    			go func() {
    				for {
    					request := &internal.RpcRequest{}
    					err := tsp.Decode(request)
    					if err != nil {
    						panic(err)
    					}
    					log.Printf("[server] get request: %v\n", request)
    					result, err := internal.Call(request.MethodName, request.Params...)
    					log.Printf("[server] invoke method: %v\n", result)
    					if err != nil {
    						response := &internal.RpcResponses{Returns: nil, Err: err}
    						tsp.Encode(response)
    						continue
    					}
    					response := &internal.RpcResponses{Returns: result, Err: err}
    					if err := tsp.Encode(response); err != nil {
    						log.Printf("[server] encode response err: %v\n", err)
    						continue
    					}
    				}
    			}()
    		}
            //...

    client.go

    // ...
    // Call: remote invoke
    func (c *Client) Call(methodName string, params ...interface{}) (res *internal.RpcResponses) {
    	request := internal.RpcRequest{MethodName: methodName, Params: params}
    	log.Printf("[client] create request to invoke server: %v\n", request)
    	err := c.tsp.Encode(request)
    	if err != nil {
    		panic(err)
    	}
    	res = &internal.RpcResponses{}
    	if err := c.tsp.Decode(res); err != nil {
    		panic(err)
    	}
    	log.Printf("[client] get response from server: %v\n", res)
    	return res
    }
    // ...

    main.go

    package main
    
    import (
    	"log"
    	"rpc_demo/internal"
    	"rpc_demo/internal/client"
    	"rpc_demo/internal/server"
    	"strings"
    	"time"
    )
    
    // Rpc方法的一個簡易實(shí)現(xiàn)
    func Join(a ...string) string {
    	res := &strings.Builder{}
    	for i := 0; i < len(a); i++ {
    		res.WriteString(a[i])
    	}
    	return res.String()
    }
    
    func main() {
    	internal.Register(Join)
    	s := server.NewServer(9999)
    	s.Start()
    	time.Sleep(5 * time.Second)
    	c := client.NewClient(9999)
    	res := c.Call("Join", "aaaaa", "bbbbb", "ccccccccc", "end")
    	if res.Err != nil {
    		log.Printf("[main] get an error from server: %v\n", res.Err)
    		return
    	}
    	log.Printf("[main] get a response from server: %v\n", res.Returns[0].(string))
    	time.Sleep(5 * time.Second)
    }

    接下來我們運(yùn)行一下main

    [root@hecs-74066 rpc_demo]# go run main.go 
    2023/03/05 14:39:11 server [127.0.0.1:9999] start....
    2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test

    [root@hecs-74066 rpc_demo]# go run main.go 
    2023/03/05 21:53:41 server [127.0.0.1:9999] start....
    2023/03/05 21:53:46 [client] create request to invoke server: {Join [aaaaa bbbbb ccccccccc end]}
    2023/03/05 21:53:46 [server] get request: &{Join [aaaaa bbbbb ccccccccc end]}
    2023/03/05 21:53:46 [server] invoke method: [aaaaabbbbbcccccccccend]
    2023/03/05 21:53:46 [client] get response from server: &{[aaaaabbbbbcccccccccend] <nil>}
    2023/03/05 21:53:46 [main] get a response from server: aaaaabbbbbcccccccccend

    到此,關(guān)于“Golang如何實(shí)現(xiàn)簡易的rpc調(diào)用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI