溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

golang中tunny的原理和作用是什么

發(fā)布時(shí)間:2021-07-05 17:00:11 來(lái)源:億速云 閱讀:301 作者:chen 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“golang中tunny的原理和作用是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

本文主要研究一下tunny

Worker

type Worker interface {
	// Process will synchronously perform a job and return the result.
	Process(interface{}) interface{}

	// BlockUntilReady is called before each job is processed and must block the
	// calling goroutine until the Worker is ready to process the next job.
	BlockUntilReady()

	// Interrupt is called when a job is cancelled. The worker is responsible
	// for unblocking the Process implementation.
	Interrupt()

	// Terminate is called when a Worker is removed from the processing pool
	// and is responsible for cleaning up any held resources.
	Terminate()
}

Worker接口定義了Process、BlockUntilReady、Interrupt、Terminate方法

closureWorker

type closureWorker struct {
	processor func(interface{}) interface{}
}

func (w *closureWorker) Process(payload interface{}) interface{} {
	return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

closureWorker定義了processor屬性,它實(shí)現(xiàn)了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法委托給processor

callbackWorker

type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {
	f, ok := payload.(func())
	if !ok {
		return ErrJobNotFunc
	}
	f()
	return nil
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt()       {}
func (w *callbackWorker) Terminate()       {}

callbackWorker定義了processor屬性,它實(shí)現(xiàn)了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法執(zhí)行的是payload函數(shù)

Pool

type Pool struct {
	queuedJobs int64

	ctor    func() Worker
	workers []*workerWrapper
	reqChan chan workRequest

	workerMut sync.Mutex
}

func New(n int, ctor func() Worker) *Pool {
	p := &Pool{
		ctor:    ctor,
		reqChan: make(chan workRequest),
	}
	p.SetSize(n)

	return p
}

func NewFunc(n int, f func(interface{}) interface{}) *Pool {
	return New(n, func() Worker {
		return &closureWorker{
			processor: f,
		}
	})
}

func NewCallback(n int) *Pool {
	return New(n, func() Worker {
		return &callbackWorker{}
	})
}

Pool定義了queuedJobs、ctor、workers、reqChan、workerMut屬性;New方法根據(jù)n和ctor創(chuàng)建Pool;NewFunc方法根據(jù)n和f來(lái)創(chuàng)建closureWorker;NewCallback方法創(chuàng)建callbackWorker

Process

func (p *Pool) Process(payload interface{}) interface{} {
	atomic.AddInt64(&p.queuedJobs, 1)

	request, open := <-p.reqChan
	if !open {
		panic(ErrPoolNotRunning)
	}

	request.jobChan <- payload

	payload, open = <-request.retChan
	if !open {
		panic(ErrWorkerClosed)
	}

	atomic.AddInt64(&p.queuedJobs, -1)
	return payload
}

Process方法首先遞增queuedJobs,然后從reqChan讀取request,然后往jobChan寫(xiě)入payload,之后再等待retChan,最后遞減queuedJobs

SetSize

func (p *Pool) SetSize(n int) {
	p.workerMut.Lock()
	defer p.workerMut.Unlock()

	lWorkers := len(p.workers)
	if lWorkers == n {
		return
	}

	// Add extra workers if N > len(workers)
	for i := lWorkers; i < n; i++ {
		p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
	}

	// Asynchronously stop all workers > N
	for i := n; i < lWorkers; i++ {
		p.workers[i].stop()
	}

	// Synchronously wait for all workers > N to stop
	for i := n; i < lWorkers; i++ {
		p.workers[i].join()
	}

	// Remove stopped workers from slice
	p.workers = p.workers[:n]
}

SetSize方法首先通過(guò)workerMut加鎖,然后根據(jù)lWorkers創(chuàng)建newWorkerWrapper,之后執(zhí)行worker.stop,再執(zhí)行worker.join(),然后清空workers

Close

func (p *Pool) Close() {
	p.SetSize(0)
	close(p.reqChan)
}

Close方法執(zhí)行SetSize(0)及close(p.reqChan)

實(shí)例

func TestFuncJob(t *testing.T) {
	pool := NewFunc(10, func(in interface{}) interface{} {
		intVal := in.(int)
		return intVal * 2
	})
	defer pool.Close()

	for i := 0; i < 10; i++ {
		ret := pool.Process(10)
		if exp, act := 20, ret.(int); exp != act {
			t.Errorf("Wrong result: %v != %v", act, exp)
		}
	}
}

TestFuncJob通過(guò)NewFunc創(chuàng)建pool,

小結(jié)

tunny的Worker接口定義了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法創(chuàng)建的是closureWorker,NewCallback方法創(chuàng)建的是callbackWorker。

“golang中tunny的原理和作用是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI