您好,登錄后才能下訂單哦!
package main
import (
"fmt"
"time"
"os"
"bufio"
"io"
)
//讀接口做抽象優(yōu)化
type Reader interface {
Read(rc chan []byte)
}
//寫接口做抽象優(yōu)化
type Writer interface {
Write(wc chan []byte)
}
type LogProcess struct {
rc chan []byte //讀chan
wc chan []byte //寫chan
read Reader
write Writer
}
type ReadFromFile struct {
path string //讀取文件的路徑
}
type WriteToInfluxDB struct {
influxDBDsn string //influx data source
}
func (r *ReadFromFile) Read(rc chan []byte) {
//讀取模塊
//打開文件
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s", err.Error()))
}
//從文件末尾開始逐行讀取文件內(nèi)容
f.Seek(0, 2)//將文件字符指針移動(dòng)到最后(0偏移量,2移動(dòng)字符指針到末尾)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n') //讀取文件內(nèi)容直到遇到換行符(即讀取一行內(nèi)容)
if err == io.EOF{ //讀取到文件末尾也會(huì)返回err
time.Sleep(time.Second)
continue //當(dāng)前循環(huán)不讓繼續(xù)向下執(zhí)行
}else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func (l *LogProcess) Process() {
//解析模塊
for v := range l.rc {
l.wc <- v
}
}
func (w *WriteToInfluxDB) Write(wc chan []byte) {
//寫入模塊
for v := range wc {
fmt.Println(string(v))
}
}
func main() {
r := &ReadFromFile{
path: "D:/go_work_dir/logs/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password..",
}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan []byte),
read: r,
write: w,
}
go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)
time.Sleep(10000 * time.Second)
}
正則解析等方法未寫...
注意,還有可以優(yōu)化的地方,如:
正則解析會(huì)比較慢,可以for給它多開幾個(gè)goroutine:
而chan沒有buff,讀寫要同時(shí)完成才行,會(huì)有阻塞。所以給它加buff,也就是相當(dāng)于加了緩存:
還要記住讀取到了日志的哪個(gè)行數(shù)
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。