您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)K8S中如何利用Exec Websocket接口實(shí)現(xiàn)Pod間的文件拷貝,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
需求
想想咱們遇到以下問(wèn)題一般怎么解決?
新建了一個(gè)Pod, 想把另外一個(gè)Pod中的文件拷貝到新Pod中進(jìn)行分析, 怎么實(shí)現(xiàn)呢?
如何在項(xiàng)目中, 如何像kubectl cp拷貝文件一樣, 實(shí)現(xiàn)Pod間文件拷貝呢?
新Pod與實(shí)例Pod共享pvc? 或者封裝一個(gè)帶認(rèn)證上下文的kubectl執(zhí)行命令行?
簡(jiǎn)介
流程說(shuō)明
首先初始化信號(hào)通道, 用于協(xié)程間的信號(hào)通知, 收到信號(hào)的協(xié)程執(zhí)行暫停/退出循環(huán)/關(guān)閉通道等操作
初始化數(shù)據(jù)通道srcStdOutCh, 類(lèi)型為字節(jié)數(shù)組[]byte, 用于將源Pod的標(biāo)準(zhǔn)輸出放入通道, 發(fā)送給目的Pod標(biāo)準(zhǔn)輸入的數(shù)據(jù)就是從該數(shù)據(jù)通道中讀取
拼接exec接口的訪問(wèn)地址(集群連接,token), tar壓縮命令, 標(biāo)準(zhǔn)輸入/輸出,tty, pod名,容器名等參數(shù). tar czf - /var/log/xxx.log 表示將該文件樹(shù)結(jié)構(gòu)壓縮為數(shù)據(jù)流
調(diào)用websocket的Dialer方法與源Pod容器建立websocket連接, 并開(kāi)啟協(xié)程將標(biāo)準(zhǔn)輸出寫(xiě)入數(shù)據(jù)通道srcStdOutCh
參考源pod exec接口, 拼接目的Pod exec訪問(wèn)連接, tar xzf - -C /tmp表示從標(biāo)準(zhǔn)輸入讀取數(shù)據(jù)流, 并解壓成文件樹(shù)結(jié)構(gòu)(注意:解壓后包含文件目錄樹(shù)結(jié)構(gòu))
與目的Pod建立wss連接, 開(kāi)啟協(xié)程從數(shù)據(jù)通道srcStdOutCh中讀取源Pod標(biāo)準(zhǔn)輸出, 并寫(xiě)入目的Pod的標(biāo)準(zhǔn)輸入, 如果從數(shù)據(jù)通道讀取超時(shí),則表示數(shù)據(jù)已經(jīng)傳輸完畢, 此時(shí)停止向目的容器輸入數(shù)據(jù), 并發(fā)送通知信號(hào), 通知主協(xié)程可以退出,關(guān)閉源Pod的wss連接
注意事項(xiàng)
wesocket連上源Pod時(shí), 標(biāo)準(zhǔn)輸出中會(huì)輸出空數(shù)據(jù), tar命令輸出等干擾數(shù)據(jù), 所以接收數(shù)據(jù)的時(shí)候需要傳入一個(gè)過(guò)濾器回調(diào)函數(shù), 用于數(shù)據(jù)過(guò)濾
向目的容器發(fā)送數(shù)據(jù)時(shí), 需要將源容器收到的第一個(gè)字節(jié)刪除, 一般為1, 表示標(biāo)準(zhǔn)輸出標(biāo)識(shí), 發(fā)送給目的容器是不需要該字節(jié)的
發(fā)送數(shù)據(jù)時(shí), 需要設(shè)置第一個(gè)字節(jié)為0, 表示發(fā)送到標(biāo)準(zhǔn)輸入
參考代碼
cp.go
/* 總結(jié): 1.不帶緩沖的通道需要先讀后寫(xiě) 2.websocket ReadMessage方法是阻塞讀取的, 如果要中斷讀取, 關(guān)閉連接, 捕獲錯(cuò)誤即可 */ package cpFilePod2Pod import ( "crypto/tls" "errors" "fmt" "log" "net/url" "regexp" "strings" "sync" "time" "github.com/gorilla/websocket" ) // 定義過(guò)濾器回調(diào)函數(shù) type filterCallback func(input string) bool // 帶有互斥鎖的Websocket連接對(duì)象 type WsConn struct { Conn *websocket.Conn mu sync.Mutex } // 發(fā)送字符串, 自動(dòng)添加換行符 func (self *WsConn) Send(sender string, str string) { self.mu.Lock() defer self.mu.Unlock() // 利用k8s exec websocket接口發(fā)送數(shù)據(jù)時(shí), 第一個(gè)字節(jié)需要設(shè)置為0, 表示將數(shù)據(jù)發(fā)送到標(biāo)準(zhǔn)輸入 data := []byte{0} data = append(data, []byte(str+"\n")...) err := self.Conn.WriteMessage(websocket.BinaryMessage, data) //發(fā)送二進(jìn)制數(shù)據(jù)類(lèi)型 if err != nil { log.Printf("發(fā)送錯(cuò)誤, %s", err.Error()) } log.Printf("%s, 數(shù)據(jù):%s, 字節(jié):%+v", sender, str, []byte(str+"\n")) } //發(fā)送字符串, 不添加換行符, 內(nèi)部做字節(jié)過(guò)濾,等操作 func (self *WsConn) SendWithFilter(sender string, str string) { self.mu.Lock() defer self.mu.Unlock() // log.Printf("向目的容器發(fā)送數(shù)據(jù):%s", str) str = strings.ReplaceAll(str, "\r\n", "\n") // /r=13, /n=10, windows換行符轉(zhuǎn)Linux換行符 //去掉第一個(gè)字節(jié)(標(biāo)準(zhǔn)輸出1, byte:[0 1 ...]), 因?yàn)閺脑慈萜鬏敵龅淖止?jié)中, 第一位標(biāo)識(shí)了標(biāo)準(zhǔn)輸出1, 給目的容器發(fā)送字節(jié)時(shí), 需要去除該標(biāo)志 //當(dāng)WebSocket建立連接后,發(fā)送數(shù)據(jù)時(shí)需要在字節(jié)Buffer第一個(gè)字節(jié)設(shè)置為stdin(buf[0] = 0),而接受數(shù)據(jù)時(shí), 需要判斷第一個(gè)字節(jié), stdout(buf[0] = 1)或stderr(buf[0] = 2) strByte := append([]byte(str)[:0], []byte(str)[1:]...) data := []byte{0} data = append(data, strByte...) err := self.Conn.WriteMessage(websocket.BinaryMessage, data) log.Printf("向目的容器標(biāo)準(zhǔn)輸入發(fā)送數(shù)據(jù):\n%s, 字節(jié)數(shù):%d, 字節(jié):%+v", string(data), len(data), data) if err != nil { log.Printf("發(fā)送錯(cuò)誤, %s", err.Error()) } } //從連接中獲取數(shù)據(jù)流, 并寫(xiě)入字節(jié)數(shù)組通道中, 內(nèi)部執(zhí)行過(guò)濾器(回調(diào)函數(shù)) func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error { self.mu.Lock() defer self.mu.Unlock() msgType, msgByte, err := self.Conn.ReadMessage() //阻塞讀取, 類(lèi)型為2表示二進(jìn)制數(shù)據(jù), 1表示文本, -1表示連接已關(guān)閉:websocket: close 1000 (normal) log.Printf("%s, 讀取到數(shù)據(jù):%s, 類(lèi)型:%d, 字節(jié)數(shù):%d, 字節(jié):%+v", receiver, string(msgByte), msgType, len(msgByte), msgByte) if err != nil { log.Printf("%s, 讀取出錯(cuò), %s", receiver, err.Error()) return err } if filter(string(msgByte)) && len(msgByte) > 1 { ch <- msgByte } else { log.Printf("%s, 數(shù)據(jù)不滿(mǎn)足, 直接丟棄數(shù)據(jù), 字符:%s, 字節(jié)數(shù):%d, 字節(jié):%v", receiver, string(msgByte), len(msgByte), msgByte) } return nil } func NewWsConn(host string, path string, params map[string]string, headers map[string][]string) (*websocket.Conn, error) { paramArray := []string{} for k, v := range params { paramArray = append(paramArray, fmt.Sprintf("%s=%s", k, v)) } u := url.URL{Scheme: "wss", Host: host, Path: path, RawQuery: strings.Join(paramArray, "&")} log.Printf("API:%s", u.String()) dialer := websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true}} conn, _, err := dialer.Dial(u.String(), headers) if err != nil { return nil, errors.New(fmt.Sprintf("連接錯(cuò)誤:%s", err.Error())) } return conn, nil } //核心: tar -cf - 將具有文件夾結(jié)構(gòu)的數(shù)據(jù)轉(zhuǎn)換成數(shù)據(jù)流, 通過(guò) tar -xf - 將數(shù)據(jù)流轉(zhuǎn)換成 linux 文件系統(tǒng) func CpPod2Pod() { //通知主函數(shù)可以退出的信號(hào)通道 signalExit := make(chan bool, 1) defer close(signalExit) //下發(fā)不要給目的容器發(fā)送數(shù)據(jù)的信號(hào) signalStopDstSend := make(chan bool, 1) defer close(signalStopDstSend) //下發(fā)不要從源容器讀取數(shù)據(jù)的信號(hào) signalStopSrcRead := make(chan bool, 1) defer close(signalStopSrcRead) //下發(fā)不要從目的容器讀取數(shù)據(jù)的信號(hào) signalStopDstRead := make(chan bool, 1) defer close(signalStopDstRead) //下發(fā)不要打印目的容器的輸出數(shù)據(jù) signalStopPrintDstStdout := make(chan bool, 1) defer close(signalStopPrintDstStdout) //連接pod host := "172.16.xxx.xxx:6443" token := "xxx" headers := map[string][]string{"authorization": {fmt.Sprintf("Bearer %s", token)}} pathSrc := "/api/v1/namespaces/xxx/pods/xxx/exec" commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - sourceFile paraSrc := map[string]string{"stdout": "1", "stdin": "0", "stderr": "1", "tty": "0", "container": "xxx", "command": commandSrc} srcConn, err := NewWsConn(host, pathSrc, paraSrc, headers) if err != nil { log.Printf("源Pod連接出錯(cuò), %s", err.Error()) } pathDst := "/api/v1/namespaces/xxx/pods/xxx/exec" commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp // paraDst := map[string]string{"stdout": "1", "stdin": "1", "stderr": "1", "tty": "0", "container": "xxx", "command": commandDst} paraDst := map[string]string{"stdout": "0", "stdin": "1", "stderr": "0", "tty": "0", "container": "xxx", "command": commandDst} //關(guān)閉目的Pod標(biāo)準(zhǔn)輸出和錯(cuò)誤輸出 dstConn, err := NewWsConn(host, pathDst, paraDst, headers) if err != nil { log.Printf("目的Pod連接出錯(cuò), %s", err.Error()) } wsSrc := WsConn{ Conn: srcConn, } wsDst := WsConn{ Conn: dstConn, } defer srcConn.Close() defer dstConn.Close() srcStdOutCh := make(chan []byte, 2048) dstStdOutCh := make(chan []byte) defer close(srcStdOutCh) defer close(dstStdOutCh) // 接收源容器標(biāo)準(zhǔn)輸出到數(shù)據(jù)通道中 go func() { i := 1 for { log.Printf("第%d次, 從源容器讀取標(biāo)準(zhǔn)輸出", i) i++ //定義匿名過(guò)濾器回調(diào)方法, 對(duì)源容器標(biāo)準(zhǔn)輸出中不需要的數(shù)據(jù)進(jìn)行過(guò)濾 err := wsSrc.Receive("源容器", srcStdOutCh, func(input string) bool { if input == "cat /var/log/mysql/slow.log" { return false // } else if match, _ := regexp.MatchString("root@(.+)#", input); match { // return false // } else if match, _ := regexp.MatchString("cat /(.+).log", input); match { // return false // } else if match, _ := regexp.MatchString("cat /tmp/(.+)", input); match { // return false } else if match, _ := regexp.MatchString("tar: Removing leading(.+)", input); match { return false } else if len(input) == 0 { //過(guò)濾空消息 // log.Printf("讀取到標(biāo)準(zhǔn)錯(cuò)誤輸出") return false } return true }) if err != nil { log.Printf("讀取源容器標(biāo)準(zhǔn)輸出失敗") // signalExit <- true break } // time.Sleep(time.Microsecond * 100) } }() /* 注意, 這里不能開(kāi)啟并發(fā)協(xié)程去讀取目的容器的標(biāo)準(zhǔn)輸出, 如果開(kāi)啟可能會(huì)與發(fā)送數(shù)據(jù)的協(xié)程搶鎖, 從而阻塞向目的容器發(fā)送數(shù)據(jù)*/ // // 從目的容器獲取標(biāo)準(zhǔn)輸出到數(shù)據(jù)通道中 // go func() { // // i := 0 // for { // // 該過(guò)濾器直接返回true, 僅占位 // err := wsDst.Receive("目的容器", dstStdOutCh, func(input string) bool { // return true // }) // if err != nil { // log.Printf("從目的容器讀取數(shù)據(jù)失敗") // break // } // // wsDst.Send() // time.Sleep(time.Microsecond * 100000) // } // // log.Printf("從目的容器讀取數(shù)據(jù), 第%d次循環(huán)", i) // // i++ // }() // //從數(shù)據(jù)通道中讀取, 目的容器的標(biāo)準(zhǔn)輸出, 并打印 // go func() { // BreakPrintDstPodStdout: // for { // select { // case data := <-dstStdOutCh: // log.Printf("目的容器標(biāo)準(zhǔn)輸出:%s", string(data)) // // time.Sleep(time.Microsecond * 200) // case <-signalStopPrintDstStdout: // log.Printf("收到信號(hào), 停止打印目的容器標(biāo)準(zhǔn)輸出") // // close(dataOutput) // // close(dataCh) // // signalStopRead <- true // // log.Printf("發(fā)送停止讀信號(hào)") // // close(dataOutput) // // close(dataCh) // break BreakPrintDstPodStdout // } // // time.Sleep(time.Microsecond * 100) // } // }() //從源容器標(biāo)準(zhǔn)輸出的數(shù)據(jù)通道獲取數(shù)據(jù), 然后發(fā)送給目的容器標(biāo)準(zhǔn)輸入 //定義超時(shí)時(shí)間 timeOutSecond := 3 timer := time.NewTimer(time.Second * time.Duration(timeOutSecond)) Break2Main: for { select { case data := <-srcStdOutCh: wsDst.SendWithFilter("向目的容器發(fā)送", string(data)) // time.Sleep(time.Millisecond * 200) timer.Reset(time.Second * time.Duration(timeOutSecond)) case <-timer.C: // time.Sleep(time.Second * 5) log.Printf("================ 源容器標(biāo)準(zhǔn)輸出,沒(méi)有新的數(shù)據(jù),獲取超時(shí),停止向目的容器發(fā)送數(shù)據(jù) ================") // log.Printf("發(fā)送信號(hào):停止打印目的容器標(biāo)準(zhǔn)輸出") // signalStopPrintDstStdout <- true log.Printf("發(fā)送信號(hào):停止從源容器讀取數(shù)據(jù)") wsSrc.Conn.Close() // log.Printf("發(fā)送信號(hào):停止從目的容器讀取數(shù)據(jù)") // wsDst.Conn.Close() log.Printf("發(fā)送信號(hào):主函數(shù)可以退出了") signalExit <- true log.Printf("所有信號(hào)發(fā)送完畢") log.Printf("================== 跳出循環(huán) =================") break Break2Main } // time.Sleep(time.Microsecond * 1000) } // signalStopRead <- true <-signalExit //阻塞通道, 直到收到一個(gè)信號(hào) // signalStopRead <- true log.Printf("主函數(shù)收到信號(hào), 準(zhǔn)備退出") // close(dataCh) // time.Sleep(time.Second) // close(dataOutput) // time.Sleep(time.Second) // select {} }
cp_test.go
package cpFilePod2Pod
import (
"log"
"testing"
)
// go test -race -test.run TestCpPod2Pod 切到該目錄執(zhí)行該測(cè)試
func TestCpPod2Pod(t *testing.T) {
log.Printf("開(kāi)始測(cè)試")
CpPod2Pod()
}
參考結(jié)果: 源容器: root@xxx-mysql-0:/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50 slow.log 目的容器: root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50 slow.log
看完上述內(nèi)容,你們對(duì)K8S中如何利用Exec Websocket接口實(shí)現(xiàn)Pod間的文件拷貝有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。