溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何理解Docker中的容器日志處理與log-driver實(shí)現(xiàn)

發(fā)布時(shí)間:2021-11-24 16:18:38 來源:億速云 閱讀:122 作者:柒染 欄目:云計(jì)算

這篇文章給大家介紹如何理解Docker中的容器日志處理與log-driver實(shí)現(xiàn),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

概要

小編將從docker(1.12.6)源碼的角度分析docker daemon怎么將容器的日志收集出來并通過配置的log-driver發(fā)送出去,并結(jié)合示例介紹了好雨云幫中實(shí)現(xiàn)的一個(gè)zmq-loger。

閱讀準(zhǔn)備 

(1)首先你需要認(rèn)知以下幾個(gè)關(guān)鍵詞:

  • stdout:
    標(biāo)準(zhǔn)輸出,進(jìn)程寫數(shù)據(jù)的流。

  • stderr:
    錯誤輸出,進(jìn)程寫錯誤數(shù)據(jù)的流。

  • 子進(jìn)程:
    由一個(gè)進(jìn)程(父進(jìn)程)創(chuàng)建的進(jìn)程,集成父進(jìn)程大部分屬性,同時(shí)可以被父進(jìn)程守護(hù)和管理。

(2)你需要知道關(guān)于進(jìn)程產(chǎn)生日志的形式
進(jìn)程產(chǎn)生日志有兩類輸出方式,一類是寫入到文件中。另一類是直接寫到stdout或者stderr,例如php的echo python的print golang的fmt.Println("")等等。
(3)是否知道docker-daemon與運(yùn)行中container的關(guān)系? 一個(gè)container就是一個(gè)特殊的進(jìn)程,它是由docker daemon創(chuàng)建并啟動,因此container是docker daemon的子進(jìn)程。由docker daemon守護(hù)和管理。因此container的stdout能夠被docker daemon獲取到。基于此理論,我們來分析docker daemon相關(guān)代碼。

docker-daemon關(guān)于日志源碼分析

container實(shí)例源碼

# /container/container.go:62
type CommonContainer struct{
    StreamConfig *stream.Config
    ...
}
# /container/stream/streams.go:26
type Config struct {
	sync.WaitGroup
	stdout    *broadcaster.Unbuffered
	stderr    *broadcaster.Unbuffered
	stdin     io.ReadCloser
	stdinPipe io.WriteCloser
}

找到如上所示對應(yīng)的代碼,顯示了每一個(gè)container實(shí)例都有幾個(gè)屬性stdout,stderr,stdin,以及管道stdinPipe。這里說下stdinPipe,當(dāng)容器使用-i參數(shù)啟動時(shí)標(biāo)準(zhǔn)輸入將被運(yùn)行,daemon將能夠使用此管道向容器內(nèi)寫入標(biāo)準(zhǔn)輸入。

![2017011930658image2017-1-18 17-18-38.png](http://7xqmjb.com1.z0.glb.clouddn.com/2017011930658image2017-1-18 17-18-38.png)

我們試想以上圖例,如果是你,你怎么實(shí)現(xiàn)日志收集轉(zhuǎn)發(fā)?

# /container/container.go:312
func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
	c, err := logger.GetLogDriver(cfg.Type)
	if err != nil {
		return nil, fmt.Errorf("Failed to get logging factory: %v", err)
	}
	ctx := logger.Context{
		Config:              cfg.Config,
		ContainerID:         container.ID,
		ContainerName:       container.Name,
		ContainerEntrypoint: container.Path,
		ContainerArgs:       container.Args,
		ContainerImageID:    container.ImageID.String(),
		ContainerImageName:  container.Config.Image,
		ContainerCreated:    container.Created,
		ContainerEnv:        container.Config.Env,
		ContainerLabels:     container.Config.Labels,
		DaemonName:          "docker",
	}

	// Set logging file for "json-logger"
	if cfg.Type == jsonfilelog.Name {
		ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
		if err != nil {
			return nil, err
		}
	}
	return c(ctx)
}
#/container/container.go:978
func (container *Container) startLogging() error {
	if container.HostConfig.LogConfig.Type == "none" {
		return nil // do not start logging routines
	}

	l, err := container.StartLogger(container.HostConfig.LogConfig)
	if err != nil {
		return fmt.Errorf("Failed to initialize logging driver: %v", err)
	}

	copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
	container.LogCopier = copier
	copier.Run()
	container.LogDriver = l

	// set LogPath field only for json-file logdriver
	if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
		container.LogPath = jl.LogPath()
	}

	return nil
}

第一個(gè)方法是為container查找log-driver。首先根據(jù)容器配置的log-driver類別調(diào)用:logger.GetLogDriver(cfg.Type)返回一個(gè)方法類型:

/daemon/logger/factory.go:9
type Creator func(Context) (Logger, error)

實(shí)質(zhì)就是從工廠類注冊的logdriver插件去查找,具體源碼下文分析。獲取到c方法后構(gòu)建調(diào)用參數(shù)具體就是容器的一些信息。然后使用調(diào)用c方法返回driver。driver是個(gè)接口類型,我們看看有哪些方法:

# /daemon/logger/logger.go:61
type Logger interface {
	Log(*Message) error
	Name() string
	Close() error
}

很簡單的三個(gè)方法,也很容易理解,Log()發(fā)送日志消息到driver,Close()進(jìn)行關(guān)閉操作(根據(jù)不同實(shí)現(xiàn))。 也就是說我們自己實(shí)現(xiàn)一個(gè)logdriver,只需要實(shí)現(xiàn)如上三個(gè)方法,然后注冊到logger工廠類中即可。下面我們來看/daemon/logger/factory.go

第二個(gè)方法就是處理日志了,獲取到日志driver,在創(chuàng)建一個(gè)Copier,顧名思義就是復(fù)制日志,分別從stdout 和stderr復(fù)制到logger driver。下面看看具體關(guān)鍵實(shí)現(xiàn):

#/daemon/logger/copir.go:41
func (c *Copier) copySrc(name string, src io.Reader) {
	defer c.copyJobs.Done()
	reader := bufio.NewReader(src)

	for {
		select {
		case <-c.closed:
			return
		default:
			line, err := reader.ReadBytes('\n')
			line = bytes.TrimSuffix(line, []byte{'\n'})

			// ReadBytes can return full or partial output even when it failed.
			// e.g. it can return a full entry and EOF.
			if err == nil || len(line) > 0 {
				if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
					logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
				}
			}

			if err != nil {
				if err != io.EOF {
					logrus.Errorf("Error scanning log stream: %s", err)
				}
				return
			}
		}
	}
}

每讀取一行數(shù)據(jù),構(gòu)建一個(gè)消息,調(diào)用logdriver的log方法發(fā)送到driver處理。

日志driver注冊器

位于/daemon/logger/factory.go的源碼實(shí)現(xiàn)即時(shí)日志driver的注冊器,其中幾個(gè)重要的方法(上文已經(jīng)提到一個(gè)):

# /daemon/logger/factory.go:21
func (lf *logdriverFactory) register(name string, c Creator) error {
	if lf.driverRegistered(name) {
		return fmt.Errorf("logger: log driver named '%s' is already registered", name)
	}

	lf.m.Lock()
	lf.registry[name] = c
	lf.m.Unlock()
	return nil
}
# /daemon/logger/factory.go:39
func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {
	lf.m.Lock()
	defer lf.m.Unlock()

	if _, ok := lf.optValidator[name]; ok {
		return fmt.Errorf("logger: log validator named '%s' is already registered", name)
	}
	lf.optValidator[name] = l
	return nil
}

看起來很簡單,就是將一個(gè)Creator方法類型添加到一個(gè)map結(jié)構(gòu)中,將LogOptValidator添加到另一個(gè)map這里注意加鎖的操作。

#/daemon/logger/factory.go:13
type LogOptValidator func(cfg map[string]string) error

這個(gè)主要是驗(yàn)證driver的參數(shù) ,dockerd和docker啟動參數(shù)中有:--log-opt

好雨云幫自己實(shí)現(xiàn)一個(gè)基于zmq的log-driver

上文已經(jīng)完整分析了docker daemon管理logdriver和處理日志的整個(gè)流程。相信你已經(jīng)比較明白了。下面我們以zmq-driver為例講講我們怎么實(shí)現(xiàn)自己的driver。直接接收容器的日志。
上文我們已經(jīng)談了一個(gè)log-driver需要實(shí)現(xiàn)的幾個(gè)方法。 我們可以看看位于/daemon/logger目錄下的已有的driver的實(shí)現(xiàn),例如fluentd,awslogs等。 下面我們來分析zmq-driver具體的代碼:

//定義一個(gè)struct,這里包含一個(gè)zmq套接字
type ZmqLogger struct {
	writer      *zmq.Socket
	containerId string
	tenantId    string
	serviceId   string
	felock      sync.Mutex
}
//定義init方法調(diào)用logger注冊器的方法注冊當(dāng)前driver
//和參數(shù)驗(yàn)證方法。
func init() {
	if err := logger.RegisterLogDriver(name, New); err != nil {
		logrus.Fatal(err)
	}
	if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
		logrus.Fatal(err)
	}
}
//實(shí)現(xiàn)一個(gè)上文提到的Creator方法注冊logdriver.
//這里新建一個(gè)zmq套接字構(gòu)建一個(gè)實(shí)例
func New(ctx logger.Context) (logger.Logger, error) {
	zmqaddress := ctx.Config[zmqAddress]

	puber, err := zmq.NewSocket(zmq.PUB)
	if err != nil {
		return nil, err
	}
	var (
		env       = make(map[string]string)
		tenantId  string
		serviceId string
	)
	for _, pair := range ctx.ContainerEnv {
		p := strings.SplitN(pair, "=", 2)
		//logrus.Errorf("ContainerEnv pair: %s", pair)
		if len(p) == 2 {
			key := p[0]
			value := p[1]
			env[key] = value
		}
	}
	tenantId = env["TENANT_ID"]
	serviceId = env["SERVICE_ID"]

	if tenantId == "" {
		tenantId = "default"
	}

	if serviceId == "" {
		serviceId = "default"
	}

	puber.Connect(zmqaddress)

	return &ZmqLogger{
		writer:      puber,
		containerId: ctx.ID(),
		tenantId:    tenantId,
		serviceId:   serviceId,
		felock:      sync.Mutex{},
	}, nil
}
//實(shí)現(xiàn)Log方法,這里使用zmq socket發(fā)送日志消息
//這里必須注意,zmq socket是線程不安全的,我們知道
//本方法可能被兩個(gè)線程(復(fù)制stdout和膚質(zhì)stderr)調(diào)用//必須使用鎖保證線程安全。否則會發(fā)生錯誤。
func (s *ZmqLogger) Log(msg *logger.Message) error {
	s.felock.Lock()
	defer s.felock.Unlock()
	s.writer.Send(s.tenantId, zmq.SNDMORE)
	s.writer.Send(s.serviceId, zmq.SNDMORE)
	if msg.Source == "stderr" {
		s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
	} else {
		s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
	}
	return nil
}
//實(shí)現(xiàn)Close方法,這里用來關(guān)閉zmq socket。
//同樣注意線程安全,調(diào)用此方法的是容器關(guān)閉協(xié)程。
func (s *ZmqLogger) Close() error {
	s.felock.Lock()
	defer s.felock.Unlock()
	if s.writer != nil {
		return s.writer.Close()
	}
	return nil
}

func (s *ZmqLogger) Name() string {
	return name
}
//驗(yàn)證參數(shù)的方法,我們使用參數(shù)傳入zmq pub的地址。
func ValidateLogOpt(cfg map[string]string) error {
	for key := range cfg {
		switch key {
		case zmqAddress:
		default:
			return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
		}
	}
	if cfg[zmqAddress] == "" {
		return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress)
	}
	return nil
}

關(guān)于如何理解Docker中的容器日志處理與log-driver實(shí)現(xiàn)就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI