溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

go.cqrs中DomainRepository的作用是什么

發(fā)布時間:2021-06-22 16:24:23 來源:億速云 閱讀:135 作者:Leah 欄目:編程語言

go.cqrs中DomainRepository的作用是什么,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

DomainRepository

// DomainRepository is the interface that all domain repositories should implement.
type DomainRepository interface {
	//Loads an aggregate of the given type and ID
	Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)

	//Saves the aggregate.
	Save(aggregate AggregateRoot, expectedVersion *int) error
}

DomainRepository定義了Load、Save方法

GetEventStoreCommonDomainRepo

// GetEventStoreCommonDomainRepo is an implementation of the DomainRepository
// that uses GetEventStore for persistence
type GetEventStoreCommonDomainRepo struct {
	eventStore         *goes.Client
	eventBus           EventBus
	streamNameDelegate StreamNamer
	aggregateFactory   AggregateFactory
	eventFactory       EventFactory
}

// Load will load all events from a stream and apply those events to an aggregate
// of the type specified.
//
// The aggregate type and id will be passed to the configured StreamNamer to
// get the stream name.
func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) {

	if r.aggregateFactory == nil {
		return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.")
	}

	if r.streamNameDelegate == nil {
		return nil, fmt.Errorf("The common domain repository has no stream name delegate.")
	}

	if r.eventFactory == nil {
		return nil, fmt.Errorf("The common domain has no Event Factory.")
	}

	aggregate := r.aggregateFactory.GetAggregate(aggregateType, id)
	if aggregate == nil {
		return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType)
	}

	streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id)
	if err != nil {
		return nil, err
	}

	stream := r.eventStore.NewStreamReader(streamName)
	for stream.Next() {
		switch err := stream.Err().(type) {
		case nil:
			break
		case *url.Error, *goes.ErrTemporarilyUnavailable:
			return nil, &ErrRepositoryUnavailable{}
		case *goes.ErrNoMoreEvents:
			return aggregate, nil
		case *goes.ErrUnauthorized:
			return nil, &ErrUnauthorized{}
		case *goes.ErrNotFound:
			return nil, &ErrAggregateNotFound{AggregateType: aggregateType, AggregateID: id}
		default:
			return nil, &ErrUnexpected{Err: err}
		}

		event := r.eventFactory.GetEvent(stream.EventResponse().Event.EventType)

		//TODO: No test for meta
		meta := make(map[string]string)
		stream.Scan(event, &meta)
		if stream.Err() != nil {
			return nil, stream.Err()
		}
		em := NewEventMessage(id, event, Int(stream.EventResponse().Event.EventNumber))
		for k, v := range meta {
			em.SetHeader(k, v)
		}
		aggregate.Apply(em, false)
		aggregate.IncrementVersion()
	}

	return aggregate, nil

}

// Save persists an aggregate
func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error {

	if r.streamNameDelegate == nil {
		return fmt.Errorf("The common domain repository has no stream name delagate.")
	}

	resultEvents := aggregate.GetChanges()

	streamName, err := r.streamNameDelegate.GetStreamName(typeOf(aggregate), aggregate.AggregateID())
	if err != nil {
		return err
	}

	if len(resultEvents) > 0 {

		evs := make([]*goes.Event, len(resultEvents))

		for k, v := range resultEvents {
			//TODO: There is no test for this code
			v.SetHeader("AggregateID", aggregate.AggregateID())
			evs[k] = goes.NewEvent("", v.EventType(), v.Event(), v.GetHeaders())
		}

		streamWriter := r.eventStore.NewStreamWriter(streamName)
		err := streamWriter.Append(expectedVersion, evs...)
		switch e := err.(type) {
		case nil:
			break
		case *goes.ErrConcurrencyViolation:
			return &ErrConcurrencyViolation{Aggregate: aggregate, ExpectedVersion: expectedVersion, StreamName: streamName}
		case *goes.ErrUnauthorized:
			return &ErrUnauthorized{}
		case *goes.ErrTemporarilyUnavailable:
			return &ErrRepositoryUnavailable{}
		default:
			return &ErrUnexpected{Err: e}
		}
	}

	aggregate.ClearChanges()

	for k, v := range resultEvents {
		if expectedVersion == nil {
			r.eventBus.PublishEvent(v)
		} else {
			em := NewEventMessage(v.AggregateID(), v.Event(), Int(*expectedVersion+k+1))
			r.eventBus.PublishEvent(em)
		}
	}

	return nil
}

GetEventStoreCommonDomainRepo定義了eventStore、eventBus、streamNameDelegate、aggregateFactory、eventFactory屬性,其Load方法先通過r.aggregateFactory.GetAggregate獲取aggregate,再通過r.streamNameDelegate.GetStreamName(aggregateType, id)獲取streamName,然后通過r.eventStore.NewStreamReader去遍歷event,挨個執(zhí)行aggregate.Apply(em, false)及aggregate.IncrementVersion();其Save方法先通過aggregate.GetChanges()獲取resultEvents,再遍歷resultEvents構(gòu)造goes.Event,之后通過streamWriter.Append寫入,然后執(zhí)行aggregate.ClearChanges(),最后執(zhí)行r.eventBus.PublishEvent

小結(jié)

go.cqrs的DomainRepository定義了Load、Save方法;GetEventStoreCommonDomainRepo實現(xiàn)了DomainRepository接口,其Load方法主要是讀取event,然后挨個執(zhí)行aggregate.Apply;其Save方法主要是將aggregate.GetChanges()轉(zhuǎn)換為event,然后通過streamWriter.Append寫入,然后執(zhí)行aggregate.ClearChanges(),最后執(zhí)行r.eventBus.PublishEvent。

看完上述內(nèi)容,你們掌握go.cqrs中DomainRepository的作用是什么的方法了嗎?如果還想學到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI