溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Go怎么批量操作excel導(dǎo)入到mongodb中

發(fā)布時(shí)間:2022-03-24 14:04:56 來(lái)源:億速云 閱讀:138 作者:iii 欄目:開(kāi)發(fā)技術(shù)

本文小編為大家詳細(xì)介紹“Go怎么批量操作excel導(dǎo)入到mongodb中”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“Go怎么批量操作excel導(dǎo)入到mongodb中”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。

    需求:完成一個(gè)命令工具,批量處理某個(gè)目錄下面的一些excel,將這些excel數(shù)據(jù)導(dǎo)入到mongodb,同時(shí)可以同步到mysql

    代碼目錄:

    ├─cmd
    |  └─ecc.go     # 命令
    ├─configs
    ├─data
    ├─internal
    │  └─importing  # 主要邏輯處理
    ├─pkg           # 處理文件讀取、連接數(shù)據(jù)庫(kù)等
    │  ├─files
    │  ├─mongo
    │  └─mysql
    ├─queue
    └─tools

    1. 選擇命令行包

    平常使用的的命令工具包有:

    • urfave/cli

    • spf13/cobra

    這里使用的是urfave/cli包,比較簡(jiǎn)單

    var DirPath = "../data"     // 默認(rèn)位置
    var dir = DirPath
    app := &cli.App{
    		Name:  "Ecc",
    		Usage: "Ecc is a tools for batch processing of excel data",
    		Flags: []cli.Flag{
    			&cli.StringFlag{
    				Name:        "model",
    				Aliases:     []string{"m"},
    				Usage:       "The model of searching",
    				Value:       "model",
    				Destination: &model,
    			},
    			&cli.StringFlag{    // 設(shè)置一個(gè) -d 的參數(shù),用來(lái)確定目標(biāo)文件夾位置
    				Name:        "dir",
    				Aliases:     []string{"d"},
    				Usage:       "Folder location of data files",
    				Destination: &dir,
    				Value:       DirPath,
    		},
    		Action: func(c *cli.Context) error {
    			importing.Load("../configs/cfg.yaml")  // 引入配置文件,讀取mongodb、mysql等配置
    			importing.Handle(dir)  ## 具體邏輯處理
    			return nil
    	}

    2. 讀取配置,連接數(shù)據(jù)庫(kù)

    讀取配置使用spf13/viper庫(kù),需要讀取一下配置,連接mongodb

    var C Config
    
    type Config struct {
    	Env   string `yaml:"env"`
    	Mongo struct {
    		DNS        string `yaml:"dns"`
    		Db         string `yaml:"db"`
    		Collection string `yaml:"collection"`
    	} `yaml:"mongo"`
    	Mysql struct {
    		Alias string `yaml:"alias"`
    		Dns   string `yaml:"dns"`
    	} `yaml:"mysql"`
    }
    func Load(cf string) {
    	var err error
    	viper.SetConfigFile(cf)
    	if err = viper.ReadInConfig(); err != nil {
    		log.Fatal(fmt.Errorf("fatal error config file: %s \n", err))
    	}
    	if err = viper.Unmarshal(&configs.C); err != nil {
    		log.Fatal(fmt.Errorf("unmarshal conf failed, err:%s \n", err))
    	if err = mongo.Conn(configs.C.Mongo.DNS, configs.C.Mongo.Db); err != nil {
    		log.Fatal(color.RedString("%s:\n%v", "mongo connect err", err))
    	if mongo.CheckCollection(configs.C.Mongo.Collection) {
    		if err = mongo.DelCollection(configs.C.Mongo.Collection); err != nil {
    			log.Fatal(color.RedString("%s:\n%v", "mongo del collection err", err))
    		}
    	if err = mongo.CreateCollection(configs.C.Mongo.Collection); err != nil {
    		log.Fatal(color.RedString("%s:\n%v", "mongo create collection err", err))

    3. 讀取文件

    先確定文件權(quán)限以及文件是否存在

    func ReadDir(dir string) ([]os.FileInfo, error) {
    	perm := checkPermission(dir)
    	if perm == true {
    		return nil, fmt.Errorf("permission denied dir: %s", dir)
    	}
    
    	if isNotExistDir(dir) {
    		return nil, fmt.Errorf("does not exist dir: %s", dir)
    	files, err := ioutil.ReadDir(dir)
    	if err == nil {
    		return files, err
    	return nil, fmt.Errorf("ReadDir: %s, err: %v", dir, err)
    }

    拿到文件后就要并發(fā)讀取每個(gè)excel文件數(shù)據(jù)

    這里需求是一次任務(wù)必須讀完所有的文件,任何一個(gè)文件有錯(cuò)誤就退出程序。

    :: 所以需要定義異常退出信道和一個(gè)完成讀取兩個(gè)信道,總的數(shù)據(jù)使用sync.Map安全并發(fā)寫入。

    3.1. 并發(fā)讀

    rWait   = true
    rDone   = make(chan struct{})
    rCrash  = make(chan struct{})
    
    read(f, dir, data)
    for rWait {  		// 使用for循環(huán)來(lái)阻塞讀文件
    	select {
    	case <-rCrash:
    		abort("-> Failure")
    		return
    	case <-rDone:
    		rWait = false
    	}
    }
    func read(fs []os.FileInfo, dir string, data *sync.Map) {
    	for _, file := range fs {
    		fileName := file.Name()
    		_ext := filepath.Ext(fileName)
    		if Include(strings.Split(Exts, ","), _ext) {
    			wg.Add(1)
    			inCh := make(chan File)
    			go func() {
    				defer wg.Done()
    				select {
    				case <-rCrash:
    					return // 退出goroutine
    				case f := <-inCh:
    					e, preData := ReadExcel(f.FilePath, f.FileName, pb)
    					if e != nil {
    						tools.Red("%v", e)
    						// 使用sync.once防止多個(gè)goroutine關(guān)閉同一個(gè)信道
    						once.Do(func() { 
    							close(rCrash)
    						})
    						return
    					}
    					data.Store(f.FileName, preData)
    				}
    			}()
    				inCh <- File{
    					FileName: fileName,
    					FilePath: dir + string(os.PathSeparator) + fileName,
    		}
    	go func() {
    		wg.Wait()
    		close(rDone)
    	}()

    3.2. 使用excelize處理excel

    excelize是一個(gè)非常好用的excel處理庫(kù),這里使用這個(gè)庫(kù)讀取excel文件內(nèi)容

    type ExcelPre struct {
    	FileName    string
    	Data        [][]string
    	Fields      []string
    	Prefixes    string
    	ProgressBar *mpb.Bar  // 進(jìn)度條對(duì)象
    }
    
    func ReadExcel(filePath, fileName string, pb *mpb.Progress) (err error, pre *ExcelPre) {
    	f, err := excelize.OpenFile(filePath)
    	if err != nil {
    		return err, nil
    	}
    	defer func() {
    		if _e := f.Close(); _e != nil {
    			fmt.Printf("%s: %v.\n\n", filePath, _e)
    		}
    	}()
    	// 獲取第一頁(yè)數(shù)據(jù)
    	firstSheet := f.WorkBook.Sheets.Sheet[0].Name
    	rows, err := f.GetRows(firstSheet)
    	lRows := len(rows)
    	if lRows < 2 {
    		lRows = 2
    	rb := ReadBar(lRows, filePath, pb)
    	wb := WriteBar(lRows-2, filePath, rb, pb)
    	var fields []string
    	var data [][]string
            // 進(jìn)度條增加一格
    	InCr := func(start time.Time) {
    		rb.Increment()
    		rb.DecoratorEwmaUpdate(time.Since(start))
    	for i := 0; i < lRows; i++ {
    		InCr(time.Now())
    		// 這里對(duì)第一行處理,用來(lái)判斷一些約定的條件
    		if i == 0 {
    			fields = rows[i]
    			for index, field := range fields {
    				if isChinese := regexp.MustCompile("[\u4e00-\u9fa5]"); isChinese.MatchString(field) || field == "" {
    					err = errors.New(fmt.Sprintf("%s: line 【A%d】 field 【%s】 \n", filePath, index, field) + "The first line of the file is not a valid attribute name.")
    					return err, nil
    				}
    			}
    			continue
    		// 過(guò)濾第二行,這一行通常是中文解釋字段
    		if i == 1 {
    		data = append(data, rows[i])
    	return nil, &ExcelPre{
    		FileName:    fileName,
    		Data:        data,
    		Fields:      fields,
    		Prefixes:    Prefix(fileName),
    		ProgressBar: wb,

    3.3. 使用mpb在命令行輸出進(jìn)度顯示

    mpb是一個(gè)很好用的命令行進(jìn)度輸出庫(kù),上面代碼里里有兩個(gè)進(jìn)度條,一個(gè)是讀進(jìn)度條,第二個(gè)是寫進(jìn)度條,讀進(jìn)度條在文件讀取的時(shí)候就顯示了,返回的結(jié)構(gòu)體里有寫進(jìn)度條對(duì)象,便于后面寫操作時(shí)候顯示。

    下面是兩個(gè)進(jìn)度條顯示的配置,具體參數(shù)可以看這個(gè)庫(kù)的文檔。

    func ReadBar(total int, name string, pb *mpb.Progress) *mpb.Bar {
    	return pb.AddBar(int64(total),
    		mpb.PrependDecorators(
    			decor.OnComplete(decor.Name(color.YellowString("reading"), decor.WCSyncSpaceR), color.YellowString("waiting")),
    			decor.CountersNoUnit("%d / %d", decor.WCSyncWidth, decor.WCSyncSpaceR),
    		),
    		mpb.AppendDecorators(
    			decor.NewPercentage("%.2f:", decor.WCSyncSpaceR),
    			decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth),
    			decor.Name(": "+name),
    	)
    }
    
    func WriteBar(total int, name string, beforeBar *mpb.Bar, pb *mpb.Progress) *mpb.Bar {
    		mpb.BarQueueAfter(beforeBar, false),
    		mpb.BarFillerClearOnComplete(),
    			decor.OnComplete(decor.Name(color.YellowString("writing"), decor.WCSyncSpaceR), color.GreenString("done")),
    			decor.OnComplete(decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), ""),
    			decor.OnComplete(decor.NewPercentage("%.2f:", decor.WCSyncSpaceR), ""),
    			decor.OnComplete(decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth), ""),
    			decor.OnComplete(decor.Name(": "+name), name),

    4. 寫入mongodb

    同寫入操作,這里拿到所有數(shù)據(jù),然后使用goroutine并發(fā)寫入mongodb,在處理數(shù)據(jù)時(shí)候需要查重,還需要記錄一下本次操作插入了哪些數(shù)據(jù)的_id值,在報(bào)錯(cuò)的時(shí)候進(jìn)行刪除(這里可以使用事務(wù),直接刪除簡(jiǎn)單些),所以定義了一個(gè)Shuttle結(jié)構(gòu)體用來(lái)在記錄并發(fā)時(shí)的數(shù)據(jù)。

    wWait   = true
    wDone   = make(chan struct{})
    wCrash  = make(chan struct{})
    
    type Shuttle struct {
    	Hid []string  	// 用來(lái)判斷是否是重復(fù)數(shù)據(jù)
    	Mid []string  	// 用來(lái)記錄本次插入的數(shù)據(jù)_id
    	mu  sync.Mutex
    }
    func (s *Shuttle) Append(t string, str string) {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    	switch t {
    	case "h":
    		s.Hid = append(s.Hid, str)
    	case "m":
    		s.Mid = append(s.Mid, str)
    	}
    write2mongo(data)
    for wWait {
    	select {
    	case <-wCrash:
    		abort("-> Failure")
    		return
    	case <-wDone:
    		wWait = false
    func write2mongo(data *sync.Map) {
    	collection := mongo.GetCollection(configs.C.Mongo.Collection)
    	data.Range(func(key, value interface{}) bool {
    		if v, ok := value.(*ExcelPre); ok {
    			wg.Add(1)
    			inCh := make(chan []bson.M)
    			go func() {
    				defer wg.Done()
    				select {
    				case <-wCrash:
    					return // exit
    				case rows := <-inCh:
    					e := Write2Mongo(rows, collection, v, &shuttle)
    					if e != nil {
    						tools.Red("%v", e)
    						once.Do(func() {
    							close(wCrash)
    						})
    						return
    					}
    				}
    			}()
    				inCh <- PreWrite(v)
    		}
    		return true
    	})
    	go func() {
    		wg.Wait()
    		close(wDone)
    	}()
    // 具體處理邏輯
    func Write2Mongo(rows []bson.M, collection *mongoDb.Collection, v *ExcelPre, s *Shuttle) error {
    	v.ProgressBar.SetCurrent(0)
    	incr := func(t time.Time, b *mpb.Bar, n int64) {
    		b.IncrInt64(n)
    		b.DecoratorEwmaUpdate(time.Since(t))
    	for _, row := range rows {
    		start := time.Now()
    		key := v.Prefixes + "@@" + row["_hid"].(string)
    		s.mu.Lock()
    		if Include(s.Hid, key) {
    			s.mu.Unlock()
    			incr(start, v.ProgressBar, 1)
    			continue
    		} else {
    			s.Hid = append(s.Hid, key)
    		var err error
    		var id primitive.ObjectID
    		if id, err = mongo.CreateDocs(collection, row); err != nil {
    			return errors.New(fmt.Sprintf("%s:\n%v", "mongo create docs err", err))
    		s.Append("m", id.Hex())
    		incr(start, v.ProgressBar, 1)
    	return nil

    5. 同步mysql

    因?yàn)橥絤ysql不是必要的,這里使用命令行輸入進(jìn)行判斷:

    tools.Yellow("-> Whether to sync data to mysql? (y/n)")
    if !tools.Scan("aborted") {
    	return
    } else {
    	tools.Yellow("-> Syncing data to mysql...")
    	if err = write2mysql(); err != nil {
    		tools.Red("-> Failure:" + err.Error())
    	} else {
    		tools.Green("-> Success.")
    	}
    }

    連接mysql數(shù)據(jù)庫(kù),拿到當(dāng)前monogodb的數(shù)據(jù):

    func write2mysql() error {
    	if err := mysql.Conn(configs.C.Mysql.Dns); err != nil {
    		return err
    	}
    
    	d, err := mongo.GetCollectionAllData(configs.C.Mongo.Collection)
    	if err != nil {
    	err = Write2Mysql(d)
    	return err
    }

    創(chuàng)建表,直接拼sql就完事了:

    func CreateTable(tableName string, fields []string) error {
    	var err error
    	delSql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableName)
    	err = Db.Exec(delSql).Error
    	if err != nil {
    		return err
    	}
    
    	s := "id bigint(20) NOT NULL PRIMARY KEY"
    	for _, field := range fields {
    		s += fmt.Sprintf(",%s varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL", field)
    	sql := fmt.Sprintf("CREATE TABLE `%s` (%s) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci", tableName, s)
    	err = Db.Exec(sql).Error
    	return nil
    }

    插入數(shù)據(jù),bson.M本身就是一個(gè)map,轉(zhuǎn)一下使用gorm分批插入數(shù)據(jù),速度快一點(diǎn):

    func InsertData(tableName string, fields []string, data []bson.M) error {
    	var err error
    	var maps []map[string]interface{}
    	for _, d := range data {
    		row := make(map[string]interface{})
    		for _, field := range fields {
    			row[field] = d[field]
    		}
    		if row != nil {
    			row["id"] = d["id"].(string)
    			maps = append(maps, row)
    	}
    
    	if len(maps) > 0 {
    		err = Db.Table(tableName).CreateInBatches(maps, 100).Error
    		if err != nil {
    			return err
    	return err
    }

    讀到這里,這篇“Go怎么批量操作excel導(dǎo)入到mongodb中”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI