溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

micro微服務(wù)基礎(chǔ)組件的組織方式是什么

發(fā)布時(shí)間:2021-11-15 16:59:03 來(lái)源:億速云 閱讀:124 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“micro微服務(wù)基礎(chǔ)組件的組織方式是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“micro微服務(wù)基礎(chǔ)組件的組織方式是什么”吧!

簡(jiǎn)介

micro是go語(yǔ)言實(shí)現(xiàn)的一個(gè)微服務(wù)框架,該框架自身實(shí)現(xiàn)了為服務(wù)常見的幾大要素,網(wǎng)關(guān),代理,注冊(cè)中心,消息傳遞,也支持可插拔擴(kuò)展。本本通過micro中的一個(gè)核心對(duì)象展開去探討這個(gè)項(xiàng)目是如何實(shí)現(xiàn)這些組件并將其組織在一起工作的。

Notice: go代碼有時(shí)候比較繁瑣,截取源碼的時(shí)候會(huì)刪除部分不影響思想的代碼會(huì)標(biāo)記為...

核心服務(wù)

micro通過micro.NewService創(chuàng)建一個(gè)服務(wù)實(shí)例,所有的微服務(wù)實(shí)例(包括網(wǎng)關(guān),代理等等)需要通過這個(gè)實(shí)例來(lái)與其他服務(wù)打交道。

    type service struct {
        opts Options

        once sync.Once
    }

代碼很少,但是micro中所有微服務(wù)要素最后都會(huì)匯聚在這個(gè)類型上了,因?yàn)閛ption包羅萬(wàn)象(^^).

    type Options struct {
        Broker    broker.Broker
        Cmd       cmd.Cmd
        Client    client.Client
        Server    server.Server
        Registry  registry.Registry
        Transport transport.Transport

        // Before and After funcs
        BeforeStart []func() error
        BeforeStop  []func() error
        AfterStart  []func() error
        AfterStop   []func() error

        // Other options for implementations of the interface
        // can be stored in a context
        Context context.Context
    }

這里可以看到微服務(wù)的常見組件了,當(dāng)micro.Server初始化的時(shí)候會(huì)給這個(gè)對(duì)象設(shè)置上對(duì)應(yīng)的組件,組件的設(shè)置方式包括默認(rèn)值,cli指定,env讀取。

func (s *service) Init(opts ...Option) {
	// process options
	for _, o := range opts {
		o(&s.opts)
	}

	s.once.Do(func() {
		// Initialise the command flags, overriding new service
		_ = s.opts.Cmd.Init(
			cmd.Broker(&s.opts.Broker),
			cmd.Registry(&s.opts.Registry),
			cmd.Transport(&s.opts.Transport),
			cmd.Client(&s.opts.Client),
			cmd.Server(&s.opts.Server),
		)
	})
}

Server是最終的行為實(shí)體,監(jiān)聽端口并提供業(yè)務(wù)服務(wù),注冊(cè)本地服務(wù)到注冊(cè)中心。 Broker異步消息,mq等方法可以替換這個(gè)類型 Client服務(wù)調(diào)用客戶端 Registry 注冊(cè)中心 Cmd cli客戶端 Transport 類似與socket,消息同步通信,服務(wù)監(jiān)聽等

服務(wù)類型

目前支持的服務(wù)類型有rpc,grpc。服務(wù)中如果存在兩種不同的rpc協(xié)議,消息投遞的時(shí)候會(huì)進(jìn)行協(xié)議轉(zhuǎn)換。這里詳細(xì)說下默認(rèn)的rpc服務(wù)。 rpc服務(wù)基于HTTP POST協(xié)議,服務(wù)啟動(dòng)的時(shí)候會(huì)嘗試連接Broker,然后注冊(cè)本服務(wù)到注冊(cè)中心,最后監(jiān)聽服務(wù)端口.簡(jiǎn)單提一句這里是如何做到協(xié)議轉(zhuǎn)換,如果http過來(lái)的消息要投遞到一個(gè)grpc協(xié)議服務(wù)上,需要在Content-Type設(shè)置對(duì)應(yīng)目的服務(wù)協(xié)議類型application/grpc,SerConn這里讀取在Content-Type在獲取對(duì)應(yīng)的Codec進(jìn)行協(xié)議轉(zhuǎn)換,最后投遞到對(duì)應(yīng)服務(wù),服務(wù)的返回內(nèi)容也同樣要處理下再返回。

    type Service interface {
        Init(...Option)
        Options() Options
        Client() client.Client
        Server() server.Server
        Run() error
        String() string
    }

rpc server

func (s *rpcServer) Start() error {
...
	ts, err := config.Transport.Listen(config.Address)
	// swap address
...
	// connect to the broker
	if err := config.Broker.Connect(); err != nil {
		return err
	}
...
	// use RegisterCheck func before register
	if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
		log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
	} else {
		// announce self to the world
		if err = s.Register(); err != nil {
			log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
		}
	}
...
	go func() {
		for {
			// listen for connections
			err := ts.Accept(s.ServeConn)
...
		}
	}()
..
	return nil
}

協(xié)議轉(zhuǎn)換

    func (s *rpcServer) ServeConn(sock transport.Socket) {
...
        for {
            var msg transport.Message
            if err := sock.Recv(&msg); err != nil {
                return
            }
...
            // we use this Content-Type header to identify the codec needed
            ct := msg.Header["Content-Type"]

            // strip our headers
            hdr := make(map[string]string)
            for k, v := range msg.Header {
                hdr[k] = v
            }

            // set local/remote ips
            hdr["Local"] = sock.Local()
            hdr["Remote"] = sock.Remote()
...
            // TODO: needs better error handling
            var err error
            if cf, err = s.newCodec(ct); err != nil {  //請(qǐng)求協(xié)議轉(zhuǎn)換器
...返回錯(cuò)誤
                return
            }
...
            rcodec := newRpcCodec(&msg, sock, cf)   //返回轉(zhuǎn)換器

            // internal request
            request := &rpcRequest{
                service:     getHeader("Micro-Service", msg.Header),
                method:      getHeader("Micro-Method", msg.Header),
                endpoint:    getHeader("Micro-Endpoint", msg.Header),
                contentType: ct,
                codec:       rcodec,
                header:      msg.Header,
                body:        msg.Body,
                socket:      sock,
                stream:      true,
            }

            // internal response
            response := &rpcResponse{
                header: make(map[string]string),
                socket: sock,
                codec:  rcodec,
            }

            // set router
            r := Router(s.router)
...
            // serve the actual request using the request router
            if err := r.ServeRequest(ctx, request, response); err != nil {
                // write an error response
                err = rcodec.Write(&codec.Message{
                    Header: msg.Header,
                    Error:  err.Error(),
                    Type:   codec.Error,
                }, nil)
                // could not write the error response
                if err != nil {
                    log.Logf("rpc: unable to write error response: %v", err)
                }
                if s.wg != nil {
                    s.wg.Done()
                }
                return
            }
...
        }
    }

注冊(cè)中心

注冊(cè)中心包括服務(wù)發(fā)現(xiàn)和服務(wù)注冊(cè)。micro中每個(gè)注冊(cè)中心類型都要實(shí)現(xiàn)注冊(cè)中心接口

    type Registry interface {
        Init(...Option) error
        Options() Options
        Register(*Service, ...RegisterOption) error
        Deregister(*Service) error
        GetService(string) ([]*Service, error)
        ListServices() ([]*Service, error)
        Watch(...WatchOption) (Watcher, error)
        String() string
    }

默認(rèn)的注冊(cè)中心是mdns,該程序會(huì)在本地監(jiān)聽一個(gè)組播地址接收網(wǎng)絡(luò)中所有及其廣播的信息,同時(shí)發(fā)送的信息也能被所有其他機(jī)器發(fā)現(xiàn)。每當(dāng)程序啟動(dòng)時(shí)都會(huì)廣播自己的服務(wù)信息,其他節(jié)點(diǎn)收到該信息后添加到自己的服務(wù)列表里面,服務(wù)關(guān)閉時(shí)會(huì)發(fā)出關(guān)閉信息。mdns自身不具備健康檢查,熔斷等功能,其出發(fā)點(diǎn)也僅是便于測(cè)試使用,因而不推薦在生產(chǎn)環(huán)境中使用。

Resolve

查找服務(wù)需要根據(jù)url或者content信息來(lái)獲取服務(wù)名稱,在通過服務(wù)名稱到注冊(cè)中心查找,獲取到服務(wù)后,隨機(jī)一個(gè)節(jié)點(diǎn)投遞

    type Resolver interface {
        Resolve(r *http.Request) (*Endpoint, error)
        String() string
    }
    //默認(rèn)的api resolve實(shí)例,除此之外還有host,path,grpc 三種resolve,可以根據(jù)需求在啟動(dòng)程序的時(shí)候指定或者設(shè)置在環(huán)境變量里面
    //
    func (r *Resolver) Resolve(req *http.Request) (*resolver.Endpoint, error) {
        var name, method string

        switch r.Options.Handler {
        // internal handlers
        case "meta", "api", "rpc", "micro":
        ///foo/bar/zool => go.micro.api.foo  方法:Bar.Zool
            ///foo/bar => go.micro.api.foo 方法:Foo.Bar
            name, method = apiRoute(req.URL.Path)
        default:
            //如果handler是web會(huì)走到這里   
            ///foo/bar/zool => go.micro.api.foo  method bar/zool 
            // 1/foo/bar/ => go.micro.api.1.foo  method bar 
            method = req.Method
            name = proxyRoute(req.URL.Path)
        }

        return &resolver.Endpoint{
            Name:   name,
            Method: method,
        }, nil
    }

插件

代碼中定義了一個(gè)plugin,然而這個(gè)plugin并非是引入組件的作用,其作用在于在請(qǐng)求的管道中加一層,這樣比較容易添加新的邏輯進(jìn)去。然而真正需要著重提到的是micro引入新組件的方式,micro service的幾個(gè)重要成員都有其對(duì)應(yīng)的接口規(guī)約,只要正確實(shí)現(xiàn)了接口就可以輕松的接入新的組件。這里用一個(gè)引入kubernetes作為注冊(cè)中心的例子。

kubernetes組件位于github.com\micro\go-plugins中

    go get -u github.com\micro\go-plugins\kubernetes

由于在go中無(wú)法實(shí)現(xiàn)java/c#中包動(dòng)態(tài)加載功能,語(yǔ)言本身不提供全局類型,函數(shù)掃描。往往只能通過一些曲折的辦法來(lái)實(shí)現(xiàn)。而micro是通過在入口文件中導(dǎo)入包,利用init函數(shù)在啟動(dòng)時(shí)將需要的功能組件寫入到一個(gè)map里面。 在import中加入插件

    _ "github.com/micro/go-plugins/registry/kubernetes"
   micro api --registry=kubernetes  --registry_address=yourAddress

工作原理: 在github.com\micro\go-micro\config\cmd\cmd.go中有一個(gè)map用于保存所有的注冊(cè)中心創(chuàng)建函數(shù)

	DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
		"consul": consul.NewRegistry,
		"gossip": gossip.NewRegistry,
		"mdns":   mdns.NewRegistry,
		"memory": rmem.NewRegistry,
	}

kubernetes會(huì)在包的init中寫入對(duì)應(yīng)的創(chuàng)建函數(shù)

    func init() {
        cmd.DefaultRegistries["kubernetes"] = NewRegistry
    }

生效的位置

    if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
		r, ok := c.opts.Registries[name]
		if !ok {
			return fmt.Errorf("Registry %s not found", name)
		}

		*c.opts.Registry = r()
		serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
		clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))

		if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {
			log.Fatalf("Error configuring registry: %v", err)
		}

		clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))

		if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {
			log.Fatalf("Error configuring broker: %v", err)
		}
	}

最后附上一張圖說明下這個(gè)核心的對(duì)象的引用關(guān)系,組件引用那一塊只是畫了注冊(cè)中心的,Broker,Server也都是類似的原理。 micro微服務(wù)基礎(chǔ)組件的組織方式是什么

感謝各位的閱讀,以上就是“micro微服務(wù)基礎(chǔ)組件的組織方式是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)micro微服務(wù)基礎(chǔ)組件的組織方式是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI