溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Go語言同步與異步執(zhí)行多個任務(wù)封裝的示例分析

發(fā)布時間:2021-06-24 09:44:20 來源:億速云 閱讀:125 作者:小新 欄目:編程語言

這篇文章主要介紹了Go語言同步與異步執(zhí)行多個任務(wù)封裝的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

同步執(zhí)行類RunnerAsync

支持返回超時檢測,系統(tǒng)中斷檢測

錯誤常量定義

//超時錯誤
var ErrTimeout = errors.New("received timeout")
//操作系統(tǒng)系統(tǒng)中斷錯誤
var ErrInterrupt = errors.New("received interrupt")

實(shí)現(xiàn)代碼如下

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
//異步執(zhí)行任務(wù)
type Runner struct {
 //操作系統(tǒng)的信號檢測
 interrupt chan os.Signal
 //記錄執(zhí)行完成的狀態(tài)
 complete chan error
 //超時檢測
 timeout <-chan time.Time
 //保存所有要執(zhí)行的任務(wù),順序執(zhí)行
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}
 
//new一個Runner對象
func NewRunner(d time.Duration) *Runner {
 return &Runner{
 interrupt: make(chan os.Signal, 1),
 complete: make(chan error),
 timeout: time.After(d),
 waitGroup: sync.WaitGroup{},
 lock: sync.Mutex{},
 }
}
 
//添加一個任務(wù)
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}
 
//啟動Runner,監(jiān)聽錯誤信息
func (this *Runner) Start() error {
 //接收操作系統(tǒng)信號
 signal.Notify(this.interrupt, os.Interrupt)
 //并發(fā)執(zhí)行任務(wù)
 go func() {
 this.complete <- this.Run()
 }()
 select {
 //返回執(zhí)行結(jié)果
 case err := <-this.complete:
 return err
 //超時返回
 case <-this.timeout:
 return ErrTimeout
 }
}
 
//異步執(zhí)行所有的任務(wù)
func (this *Runner) Run() error {
 for id, task := range this.tasks {
 if this.gotInterrupt() {
  return ErrInterrupt
 }
 this.waitGroup.Add(1)
 go func(id int) {
  this.lock.Lock()
  //執(zhí)行任務(wù)
  err := task(id)
  //加鎖保存到結(jié)果集中
  this.errs = append(this.errs, err)
 
  this.lock.Unlock()
  this.waitGroup.Done()
 }(id)
 }
 this.waitGroup.Wait()
 
 return nil
}
 
//判斷是否接收到操作系統(tǒng)中斷信號
func (this *Runner) gotInterrupt() bool {
 select {
 case <-this.interrupt:
 //停止接收別的信號
 signal.Stop(this.interrupt)
 return true
 //正常執(zhí)行
 default:
 return false
 }
}
 
//獲取執(zhí)行完的error
func (this *Runner) GetErrs() []error {
 return this.errs
}

使用方法    

Add添加一個任務(wù),任務(wù)為接收int類型的一個閉包

Start開始執(zhí)行傷,返回一個error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時,ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)

測試示例代碼

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunnerAsync_Start(t *testing.T) {
 //開啟多核
 runtime.GOMAXPROCS(runtime.NumCPU())
 //創(chuàng)建runner對象,設(shè)置超時時間
 runner := NewRunnerAsync(8 * time.Second)
 //添加運(yùn)行的任務(wù)
 runner.Add(
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 )
 fmt.Println("同步執(zhí)行任務(wù)")
 //開始執(zhí)行任務(wù)
 if err := runner.Start(); err != nil {
 switch err {
 case ErrTimeout:
  fmt.Println("執(zhí)行超時")
  os.Exit(1)
 case ErrInterrupt:
  fmt.Println("任務(wù)被中斷")
  os.Exit(2)
 }
 }
 t.Log("執(zhí)行結(jié)束")
}
 
//創(chuàng)建要執(zhí)行的任務(wù)
func createTaskAsync() func(id int) {
 return func(id int) {
 fmt.Printf("正在執(zhí)行%v個任務(wù)\n", id)
 //模擬任務(wù)執(zhí)行,sleep兩秒
 //time.Sleep(1 * time.Second)
 }
}

執(zhí)行結(jié)果  

同步執(zhí)行任務(wù)
正在執(zhí)行0個任務(wù)
正在執(zhí)行1個任務(wù)
正在執(zhí)行2個任務(wù)
正在執(zhí)行3個任務(wù)
正在執(zhí)行4個任務(wù)
正在執(zhí)行5個任務(wù)
正在執(zhí)行6個任務(wù)
正在執(zhí)行7個任務(wù)
正在執(zhí)行8個任務(wù)
正在執(zhí)行9個任務(wù)
正在執(zhí)行10個任務(wù)
正在執(zhí)行11個任務(wù)
正在執(zhí)行12個任務(wù)
 runnerAsync_test.go:49: 執(zhí)行結(jié)束

異步執(zhí)行類Runner

支持返回超時檢測,系統(tǒng)中斷檢測

實(shí)現(xiàn)代碼如下

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
//異步執(zhí)行任務(wù)
type Runner struct {
 //操作系統(tǒng)的信號檢測
 interrupt chan os.Signal
 //記錄執(zhí)行完成的狀態(tài)
 complete chan error
 //超時檢測
 timeout <-chan time.Time
 //保存所有要執(zhí)行的任務(wù),順序執(zhí)行
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}
 
//new一個Runner對象
func NewRunner(d time.Duration) *Runner {
 return &Runner{
  interrupt: make(chan os.Signal, 1),
  complete: make(chan error),
  timeout: time.After(d),
  waitGroup: sync.WaitGroup{},
  lock:  sync.Mutex{},
 }
}
 
//添加一個任務(wù)
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}
 
//啟動Runner,監(jiān)聽錯誤信息
func (this *Runner) Start() error {
 //接收操作系統(tǒng)信號
 signal.Notify(this.interrupt, os.Interrupt)
 //并發(fā)執(zhí)行任務(wù)
 go func() {
  this.complete <- this.Run()
 }()
 select {
 //返回執(zhí)行結(jié)果
 case err := <-this.complete:
  return err
  //超時返回
 case <-this.timeout:
  return ErrTimeout
 }
}
 
//異步執(zhí)行所有的任務(wù)
func (this *Runner) Run() error {
 for id, task := range this.tasks {
  if this.gotInterrupt() {
   return ErrInterrupt
  }
  this.waitGroup.Add(1)
  go func(id int) {
   this.lock.Lock()
   //執(zhí)行任務(wù)
   err := task(id)
   //加鎖保存到結(jié)果集中
   this.errs = append(this.errs, err)
   this.lock.Unlock()
   this.waitGroup.Done()
  }(id)
 }
 this.waitGroup.Wait()
 return nil
}
 
//判斷是否接收到操作系統(tǒng)中斷信號
func (this *Runner) gotInterrupt() bool {
 select {
 case <-this.interrupt:
  //停止接收別的信號
  signal.Stop(this.interrupt)
  return true
  //正常執(zhí)行
 default:
  return false
 }
}
 
//獲取執(zhí)行完的error
func (this *Runner) GetErrs() []error {
 return this.errs
}

使用方法    

Add添加一個任務(wù),任務(wù)為接收int類型,返回類型error的一個閉包

Start開始執(zhí)行傷,返回一個error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時,ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)

getErrs獲取所有的任務(wù)執(zhí)行結(jié)果

測試示例代碼

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunner_Start(t *testing.T) {
 //開啟多核心
 runtime.GOMAXPROCS(runtime.NumCPU())
 //創(chuàng)建runner對象,設(shè)置超時時間
 runner := NewRunner(18 * time.Second)
 //添加運(yùn)行的任務(wù)
 runner.Add(
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
 )
 fmt.Println("異步執(zhí)行任務(wù)")
 //開始執(zhí)行任務(wù)
 if err := runner.Start(); err != nil {
  switch err {
  case ErrTimeout:
   fmt.Println("執(zhí)行超時")
   os.Exit(1)
  case ErrInterrupt:
   fmt.Println("任務(wù)被中斷")
   os.Exit(2)
  }
 }
 t.Log("執(zhí)行結(jié)束")
 t.Log(runner.GetErrs())
}
 
//創(chuàng)建要執(zhí)行的任務(wù)
func createTask() func(id int) error {
 return func(id int) error {
  fmt.Printf("正在執(zhí)行%v個任務(wù)\n", id)
  //模擬任務(wù)執(zhí)行,sleep
  //time.Sleep(1 * time.Second)
  return nil
 }
}

執(zhí)行結(jié)果

異步執(zhí)行任務(wù)
正在執(zhí)行2個任務(wù)
正在執(zhí)行1個任務(wù)
正在執(zhí)行4個任務(wù)
正在執(zhí)行3個任務(wù)
正在執(zhí)行6個任務(wù)
正在執(zhí)行5個任務(wù)
正在執(zhí)行9個任務(wù)
正在執(zhí)行7個任務(wù)
正在執(zhí)行10個任務(wù)
正在執(zhí)行13個任務(wù)
正在執(zhí)行8個任務(wù)
正在執(zhí)行11個任務(wù)
正在執(zhí)行12個任務(wù)
正在執(zhí)行0個任務(wù)
 runner_test.go:49: 執(zhí)行結(jié)束
 runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Go語言同步與異步執(zhí)行多個任務(wù)封裝的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI