溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

K8S中如何利用Exec Websocket接口實(shí)現(xiàn)Pod間的文件拷貝

發(fā)布時(shí)間:2021-12-16 10:37:50 來(lái)源:億速云 閱讀:280 作者:柒染 欄目:互聯(lián)網(wǎng)科技

今天就跟大家聊聊有關(guān)K8S中如何利用Exec Websocket接口實(shí)現(xiàn)Pod間的文件拷貝,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

 需求

想想咱們遇到以下問(wèn)題一般怎么解決?

新建了一個(gè)Pod, 想把另外一個(gè)Pod中的文件拷貝到新Pod中進(jìn)行分析, 怎么實(shí)現(xiàn)呢?

如何在項(xiàng)目中, 如何像kubectl cp拷貝文件一樣, 實(shí)現(xiàn)Pod間文件拷貝呢?

新Pod與實(shí)例Pod共享pvc? 或者封裝一個(gè)帶認(rèn)證上下文的kubectl執(zhí)行命令行?

簡(jiǎn)介


K8S中如何利用Exec Websocket接口實(shí)現(xiàn)Pod間的文件拷貝

流程說(shuō)明

  • 首先初始化信號(hào)通道, 用于協(xié)程間的信號(hào)通知, 收到信號(hào)的協(xié)程執(zhí)行暫停/退出循環(huán)/關(guān)閉通道等操作

  • 初始化數(shù)據(jù)通道srcStdOutCh, 類(lèi)型為字節(jié)數(shù)組[]byte, 用于將源Pod的標(biāo)準(zhǔn)輸出放入通道,  發(fā)送給目的Pod標(biāo)準(zhǔn)輸入的數(shù)據(jù)就是從該數(shù)據(jù)通道中讀取

  • 拼接exec接口的訪問(wèn)地址(集群連接,token), tar壓縮命令, 標(biāo)準(zhǔn)輸入/輸出,tty, pod名,容器名等參數(shù). tar czf -  /var/log/xxx.log 表示將該文件樹(shù)結(jié)構(gòu)壓縮為數(shù)據(jù)流

  • 調(diào)用websocket的Dialer方法與源Pod容器建立websocket連接, 并開(kāi)啟協(xié)程將標(biāo)準(zhǔn)輸出寫(xiě)入數(shù)據(jù)通道srcStdOutCh

  • 參考源pod exec接口, 拼接目的Pod exec訪問(wèn)連接, tar xzf - -C /tmp表示從標(biāo)準(zhǔn)輸入讀取數(shù)據(jù)流,  并解壓成文件樹(shù)結(jié)構(gòu)(注意:解壓后包含文件目錄樹(shù)結(jié)構(gòu))

  • 與目的Pod建立wss連接, 開(kāi)啟協(xié)程從數(shù)據(jù)通道srcStdOutCh中讀取源Pod標(biāo)準(zhǔn)輸出, 并寫(xiě)入目的Pod的標(biāo)準(zhǔn)輸入,  如果從數(shù)據(jù)通道讀取超時(shí),則表示數(shù)據(jù)已經(jīng)傳輸完畢, 此時(shí)停止向目的容器輸入數(shù)據(jù), 并發(fā)送通知信號(hào), 通知主協(xié)程可以退出,關(guān)閉源Pod的wss連接

注意事項(xiàng)

  • wesocket連上源Pod時(shí), 標(biāo)準(zhǔn)輸出中會(huì)輸出空數(shù)據(jù), tar命令輸出等干擾數(shù)據(jù), 所以接收數(shù)據(jù)的時(shí)候需要傳入一個(gè)過(guò)濾器回調(diào)函數(shù),  用于數(shù)據(jù)過(guò)濾

  • 向目的容器發(fā)送數(shù)據(jù)時(shí), 需要將源容器收到的第一個(gè)字節(jié)刪除, 一般為1, 表示標(biāo)準(zhǔn)輸出標(biāo)識(shí), 發(fā)送給目的容器是不需要該字節(jié)的

  • 發(fā)送數(shù)據(jù)時(shí), 需要設(shè)置第一個(gè)字節(jié)為0, 表示發(fā)送到標(biāo)準(zhǔn)輸入

參考代碼

cp.go

/* 總結(jié): 1.不帶緩沖的通道需要先讀后寫(xiě) 2.websocket ReadMessage方法是阻塞讀取的, 如果要中斷讀取, 關(guān)閉連接, 捕獲錯(cuò)誤即可 */ package cpFilePod2Pod  import (   "crypto/tls"   "errors"   "fmt"   "log"   "net/url"   "regexp"   "strings"   "sync"   "time"    "github.com/gorilla/websocket" )  // 定義過(guò)濾器回調(diào)函數(shù) type filterCallback func(input string) bool  // 帶有互斥鎖的Websocket連接對(duì)象 type WsConn struct {   Conn *websocket.Conn   mu   sync.Mutex }  // 發(fā)送字符串, 自動(dòng)添加換行符 func (self *WsConn) Send(sender string, str string) {   self.mu.Lock()   defer self.mu.Unlock()   // 利用k8s exec websocket接口發(fā)送數(shù)據(jù)時(shí), 第一個(gè)字節(jié)需要設(shè)置為0, 表示將數(shù)據(jù)發(fā)送到標(biāo)準(zhǔn)輸入   data := []byte{0}   data = append(data, []byte(str+"\n")...)   err := self.Conn.WriteMessage(websocket.BinaryMessage, data) //發(fā)送二進(jìn)制數(shù)據(jù)類(lèi)型   if err != nil {     log.Printf("發(fā)送錯(cuò)誤, %s", err.Error())   }   log.Printf("%s, 數(shù)據(jù):%s, 字節(jié):%+v", sender, str, []byte(str+"\n")) }  //發(fā)送字符串, 不添加換行符, 內(nèi)部做字節(jié)過(guò)濾,等操作 func (self *WsConn) SendWithFilter(sender string, str string) {   self.mu.Lock()   defer self.mu.Unlock()   // log.Printf("向目的容器發(fā)送數(shù)據(jù):%s", str)   str = strings.ReplaceAll(str, "\r\n", "\n") // /r=13, /n=10, windows換行符轉(zhuǎn)Linux換行符   //去掉第一個(gè)字節(jié)(標(biāo)準(zhǔn)輸出1, byte:[0 1 ...]), 因?yàn)閺脑慈萜鬏敵龅淖止?jié)中, 第一位標(biāo)識(shí)了標(biāo)準(zhǔn)輸出1, 給目的容器發(fā)送字節(jié)時(shí), 需要去除該標(biāo)志   //當(dāng)WebSocket建立連接后,發(fā)送數(shù)據(jù)時(shí)需要在字節(jié)Buffer第一個(gè)字節(jié)設(shè)置為stdin(buf[0] = 0),而接受數(shù)據(jù)時(shí), 需要判斷第一個(gè)字節(jié), stdout(buf[0] = 1)或stderr(buf[0] = 2)   strByte := append([]byte(str)[:0], []byte(str)[1:]...)   data := []byte{0}   data = append(data, strByte...)   err := self.Conn.WriteMessage(websocket.BinaryMessage, data)   log.Printf("向目的容器標(biāo)準(zhǔn)輸入發(fā)送數(shù)據(jù):\n%s, 字節(jié)數(shù):%d, 字節(jié):%+v", string(data), len(data), data)   if err != nil {     log.Printf("發(fā)送錯(cuò)誤, %s", err.Error())   } }  //從連接中獲取數(shù)據(jù)流, 并寫(xiě)入字節(jié)數(shù)組通道中, 內(nèi)部執(zhí)行過(guò)濾器(回調(diào)函數(shù)) func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error {   self.mu.Lock()   defer self.mu.Unlock()   msgType, msgByte, err := self.Conn.ReadMessage() //阻塞讀取, 類(lèi)型為2表示二進(jìn)制數(shù)據(jù), 1表示文本, -1表示連接已關(guān)閉:websocket: close 1000 (normal)   log.Printf("%s, 讀取到數(shù)據(jù):%s, 類(lèi)型:%d, 字節(jié)數(shù):%d, 字節(jié):%+v", receiver, string(msgByte), msgType, len(msgByte), msgByte)   if err != nil {     log.Printf("%s, 讀取出錯(cuò), %s", receiver, err.Error())     return err   }   if filter(string(msgByte)) && len(msgByte) > 1 {     ch <- msgByte   } else {     log.Printf("%s, 數(shù)據(jù)不滿(mǎn)足, 直接丟棄數(shù)據(jù), 字符:%s, 字節(jié)數(shù):%d, 字節(jié):%v", receiver, string(msgByte), len(msgByte), msgByte)   }   return nil }  func NewWsConn(host string, path string, params map[string]string, headers map[string][]string) (*websocket.Conn, error) {   paramArray := []string{}   for k, v := range params {     paramArray = append(paramArray, fmt.Sprintf("%s=%s", k, v))   }   u := url.URL{Scheme: "wss", Host: host, Path: path, RawQuery: strings.Join(paramArray, "&")}   log.Printf("API:%s", u.String())   dialer := websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true}}   conn, _, err := dialer.Dial(u.String(), headers)   if err != nil {     return nil, errors.New(fmt.Sprintf("連接錯(cuò)誤:%s", err.Error()))   }   return conn, nil }  //核心: tar -cf - 將具有文件夾結(jié)構(gòu)的數(shù)據(jù)轉(zhuǎn)換成數(shù)據(jù)流, 通過(guò) tar -xf - 將數(shù)據(jù)流轉(zhuǎn)換成 linux 文件系統(tǒng) func CpPod2Pod() {   //通知主函數(shù)可以退出的信號(hào)通道   signalExit := make(chan bool, 1)   defer close(signalExit)    //下發(fā)不要給目的容器發(fā)送數(shù)據(jù)的信號(hào)   signalStopDstSend := make(chan bool, 1)   defer close(signalStopDstSend)    //下發(fā)不要從源容器讀取數(shù)據(jù)的信號(hào)   signalStopSrcRead := make(chan bool, 1)   defer close(signalStopSrcRead)    //下發(fā)不要從目的容器讀取數(shù)據(jù)的信號(hào)   signalStopDstRead := make(chan bool, 1)   defer close(signalStopDstRead)    //下發(fā)不要打印目的容器的輸出數(shù)據(jù)   signalStopPrintDstStdout := make(chan bool, 1)   defer close(signalStopPrintDstStdout)    //連接pod   host := "172.16.xxx.xxx:6443"   token := "xxx"   headers := map[string][]string{"authorization": {fmt.Sprintf("Bearer %s", token)}}    pathSrc := "/api/v1/namespaces/xxx/pods/xxx/exec"   commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - sourceFile   paraSrc := map[string]string{"stdout": "1", "stdin": "0", "stderr": "1", "tty": "0", "container": "xxx", "command": commandSrc}   srcConn, err := NewWsConn(host, pathSrc, paraSrc, headers)   if err != nil {     log.Printf("源Pod連接出錯(cuò), %s", err.Error())   }    pathDst := "/api/v1/namespaces/xxx/pods/xxx/exec"   commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp   // paraDst := map[string]string{"stdout": "1", "stdin": "1", "stderr": "1", "tty": "0", "container": "xxx", "command": commandDst}   paraDst := map[string]string{"stdout": "0", "stdin": "1", "stderr": "0", "tty": "0", "container": "xxx", "command": commandDst} //關(guān)閉目的Pod標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出   dstConn, err := NewWsConn(host, pathDst, paraDst, headers)   if err != nil {     log.Printf("目的Pod連接出錯(cuò), %s", err.Error())   }    wsSrc := WsConn{     Conn: srcConn,   }    wsDst := WsConn{     Conn: dstConn,   }    defer srcConn.Close()   defer dstConn.Close()    srcStdOutCh := make(chan []byte, 2048)   dstStdOutCh := make(chan []byte)   defer close(srcStdOutCh)   defer close(dstStdOutCh)    // 接收源容器標(biāo)準(zhǔn)輸出到數(shù)據(jù)通道中   go func() {     i := 1     for {       log.Printf("第%d次, 從源容器讀取標(biāo)準(zhǔn)輸出", i)       i++       //定義匿名過(guò)濾器回調(diào)方法, 對(duì)源容器標(biāo)準(zhǔn)輸出中不需要的數(shù)據(jù)進(jìn)行過(guò)濾       err := wsSrc.Receive("源容器", srcStdOutCh, func(input string) bool {         if input == "cat /var/log/mysql/slow.log" {           return false           // } else if match, _ := regexp.MatchString("root@(.+)#", input); match {           //   return false           // } else if match, _ := regexp.MatchString("cat /(.+).log", input); match {           //   return false           // } else if match, _ := regexp.MatchString("cat /tmp/(.+)", input); match {           //   return false         } else if match, _ := regexp.MatchString("tar: Removing leading(.+)", input); match {           return false         } else if len(input) == 0 { //過(guò)濾空消息           // log.Printf("讀取到標(biāo)準(zhǔn)錯(cuò)誤輸出")           return false         }         return true       })       if err != nil {         log.Printf("讀取源容器標(biāo)準(zhǔn)輸出失敗")         // signalExit <- true         break       }       // time.Sleep(time.Microsecond * 100)     }   }()    /* 注意, 這里不能開(kāi)啟并發(fā)協(xié)程去讀取目的容器的標(biāo)準(zhǔn)輸出, 如果開(kāi)啟可能會(huì)與發(fā)送數(shù)據(jù)的協(xié)程搶鎖, 從而阻塞向目的容器發(fā)送數(shù)據(jù)*/   // // 從目的容器獲取標(biāo)準(zhǔn)輸出到數(shù)據(jù)通道中   // go func() {   //   // i := 0   //   for {   //     // 該過(guò)濾器直接返回true, 僅占位   //     err := wsDst.Receive("目的容器", dstStdOutCh, func(input string) bool {   //       return true   //     })   //     if err != nil {   //       log.Printf("從目的容器讀取數(shù)據(jù)失敗")   //       break   //     }   //     // wsDst.Send()   //     time.Sleep(time.Microsecond * 100000)   //   }   //   // log.Printf("從目的容器讀取數(shù)據(jù), 第%d次循環(huán)", i)   //   // i++   // }()    // //從數(shù)據(jù)通道中讀取, 目的容器的標(biāo)準(zhǔn)輸出, 并打印   // go func() {   // BreakPrintDstPodStdout:   //   for {   //     select {   //     case data := <-dstStdOutCh:   //       log.Printf("目的容器標(biāo)準(zhǔn)輸出:%s", string(data))   //       // time.Sleep(time.Microsecond * 200)   //     case <-signalStopPrintDstStdout:   //       log.Printf("收到信號(hào), 停止打印目的容器標(biāo)準(zhǔn)輸出")   //       // close(dataOutput)   //       // close(dataCh)   //       // signalStopRead <- true   //       // log.Printf("發(fā)送停止讀信號(hào)")   //       // close(dataOutput)   //       // close(dataCh)   //       break BreakPrintDstPodStdout   //     }   //     // time.Sleep(time.Microsecond * 100)   //   }   // }()    //從源容器標(biāo)準(zhǔn)輸出的數(shù)據(jù)通道獲取數(shù)據(jù), 然后發(fā)送給目的容器標(biāo)準(zhǔn)輸入   //定義超時(shí)時(shí)間   timeOutSecond := 3   timer := time.NewTimer(time.Second * time.Duration(timeOutSecond)) Break2Main:   for {     select {     case data := <-srcStdOutCh:       wsDst.SendWithFilter("向目的容器發(fā)送", string(data))       // time.Sleep(time.Millisecond * 200)       timer.Reset(time.Second * time.Duration(timeOutSecond))     case <-timer.C:       // time.Sleep(time.Second * 5)       log.Printf("================ 源容器標(biāo)準(zhǔn)輸出,沒(méi)有新的數(shù)據(jù),獲取超時(shí),停止向目的容器發(fā)送數(shù)據(jù) ================")       // log.Printf("發(fā)送信號(hào):停止打印目的容器標(biāo)準(zhǔn)輸出")       // signalStopPrintDstStdout <- true       log.Printf("發(fā)送信號(hào):停止從源容器讀取數(shù)據(jù)")       wsSrc.Conn.Close()       // log.Printf("發(fā)送信號(hào):停止從目的容器讀取數(shù)據(jù)")       // wsDst.Conn.Close()       log.Printf("發(fā)送信號(hào):主函數(shù)可以退出了")       signalExit <- true       log.Printf("所有信號(hào)發(fā)送完畢")       log.Printf("================== 跳出循環(huán) =================")       break Break2Main     }     // time.Sleep(time.Microsecond * 1000)   }    // signalStopRead <- true   <-signalExit //阻塞通道, 直到收到一個(gè)信號(hào)   // signalStopRead <- true   log.Printf("主函數(shù)收到信號(hào), 準(zhǔn)備退出")   // close(dataCh)   // time.Sleep(time.Second)   // close(dataOutput)   // time.Sleep(time.Second)   // select {} }

cp_test.go

  1. package cpFilePod2Pod 

  2.  

  3. import ( 

  4.   "log" 

  5.   "testing" 

  6.  

  7. // go test -race -test.run TestCpPod2Pod  切到該目錄執(zhí)行該測(cè)試 

  8. func TestCpPod2Pod(t *testing.T) { 

  9.   log.Printf("開(kāi)始測(cè)試") 

  10.   CpPod2Pod() 


參考結(jié)果: 源容器: root@xxx-mysql-0:/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50  slow.log 目的容器: root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50  slow.log

看完上述內(nèi)容,你們對(duì)K8S中如何利用Exec Websocket接口實(shí)現(xiàn)Pod間的文件拷貝有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI