溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang如何處理parquet文件

發(fā)布時間:2023-03-07 11:18:57 來源:億速云 閱讀:99 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹“Golang如何處理parquet文件”,在日常操作中,相信很多人在Golang如何處理parquet文件問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Golang如何處理parquet文件”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

前言

Parquet是Apache基金會支持的項目,是面向列存儲二進制文件格式。支持不同類型的壓縮方式,廣泛用于數(shù)據(jù)科學和大數(shù)據(jù)環(huán)境,如Hadoop生態(tài)。

創(chuàng)建結構體

首先創(chuàng)建struct,用于表示要處理的數(shù)據(jù):

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

這里要提醒的是tag,用于說明struct中每個字段在生成parquet過程中如何被處理。

parquet-go包可以處理parquet數(shù)據(jù),更多的tag可以參考其官網(wǎng)。

生成parquet文件

下面現(xiàn)給出生成parquet文件的代碼,然后分別進行說明:

package main

import (
  "fmt"
  "log"
  "time"
  "github.com/bxcodec/faker/v3"
  "github.com/xitongsys/parquet-go-source/local"
  "github.com/xitongsys/parquet-go/parquet"
  "github.com/xitongsys/parquet-go/reader"
  "github.com/xitongsys/parquet-go/writer"
)

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

const recordNumber = 10000

func main() {
  var data []*user
  //create fake data
  for i := 0; i < recordNumber; i++ {
    u := &user{
      ID:        faker.UUIDDigit(),
      FirstName: faker.FirstName(),
      LastName:  faker.LastName(),
      Email:     faker.Email(),
      Phone:     faker.Phonenumber(),
      Blog:      faker.URL(),
      Username:  faker.Username(),
      Score:     float64(i),
      CreatedAt: time.Now(),
    }
    data = append(data, u)
  }
  err := generateParquet(data)
  if err != nil {
    log.Fatal(err)
  }

}

func generateParquet(data []*user) error {
  log.Println("generating parquet file")
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }
  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()
  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil
}

定義結構體上面已經(jīng)說明,但需要提醒的是類型與文檔保持一致:

Primitive TypeGo Type
BOOLEANbool
INT32int32
INT64int64
INT96(deprecated)string
FLOATfloat32
DOUBLEfloat64
BYTE_ARRAYstring
FIXED_LEN_BYTE_ARRAYstring

接著就是使用faker包生成模擬數(shù)據(jù)。然后調用err := generateParquet(data)方法。該方法大概邏輯為:

  • 首先準備輸出文件,然后基于本地輸出文件構造pw,用于寫parquet數(shù)據(jù):

  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }

  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()

然后設置壓縮類型,并通過defer操作確保關閉文件。下面開始寫數(shù)據(jù):

  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil

循環(huán)寫數(shù)據(jù),最后調用pw.WriteStop()停止寫。 成功寫文件后,下面介紹如何讀取parquet文件。

讀取parquet文件

首先介紹如何一次性讀取文件,主要用于讀取較小的文件:

func readParquet() ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }

  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}

大概流程如下:首先定義本地文件,然后構造pr用于讀取parquet文件:

  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }

  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

然后定義目標內容容器u,一次性讀取數(shù)據(jù):

  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()

但一次性大量記錄加載至內存可能有問題。這是官方文檔提示:

If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don&rsquo;t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file.

大意是不要一次讀取文件至內存,可能造成OOM。實際應用中應該分頁讀取,下面通過代碼進行說明:

func readPartialParquet(pageSize, page int) ([]*user, error) {
	fr, err := local.NewLocalFileReader("output.parquet")
	if err != nil {
		return nil, err
	}
	defer func() {
		_ = fr.Close()
	}()

	pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))
	if err != nil {
		return nil, err
	}
	defer pr.ReadStop()

	//num := pr.GetNumRows()
	
	pr.SkipRows(int64(pageSize * page))
	u := make([]*user, pageSize)
	if err = pr.Read(&u); err != nil {
		return nil, err
	}

	return u, nil
}

與上面函數(shù)差異不大,首先函數(shù)包括兩個參數(shù),用于指定頁大小和頁數(shù),關鍵代碼是跳過一定記錄:

  pr.SkipRows(int64(pageSize * page))

根據(jù)這個方法可以獲得總行數(shù),pr.GetNumRows(),然后結合頁大小計算總頁數(shù),最后循環(huán)可以實現(xiàn)分頁查詢。

計算列平均值

既然使用了Parquet列存儲格式,下面演示下如何計算Score列的平均值。

func calcScoreAVG() (float64, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return 0.0, err
  }
  pr, err := reader.NewParquetColumnReader(fr, recordNumber)
  if err != nil {
    return 0.0, err
  }
  num := int(pr.GetNumRows())

  data, _, _, err := pr.ReadColumnByPath("parquet_go_root\u0001score", num)
  if err != nil {
    return 0.0, err
  }
  var result float64
  for _, i := range data {
    result += i.(float64)
  }
  return (result / float64(num)), nil
}

首先打開文件,然后調用pr.GetNumRows()方法獲取總行數(shù)。然后基于路徑指定列,其中parquet_go_root為根路徑,因為前面使用字節(jié)數(shù)組,這里分割符變?yōu)閈u0001,完整路徑為:parquet_go_root\u0001score。

到此,關于“Golang如何處理parquet文件”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI