溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何接入異步任務(wù)及使用log

發(fā)布時間:2021-10-19 15:22:57 來源:億速云 閱讀:142 作者:iii 欄目:編程語言

這篇文章主要講解了“如何接入異步任務(wù)及使用log”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何接入異步任務(wù)及使用log”吧!

Delay Job

日常任務(wù)開放中,我們會有很多異步、批量、定時、延遲任務(wù)要處理,go-zero中有 go-queue,推薦使用 go-queue 去處理,go-queue 本身也是基于 go-zero 開發(fā)的,其本身是有兩種模式:

  • dq : 依賴于 beanstalkd ,分布式,可存儲,延遲、定時設(shè)置,關(guān)機(jī)重啟可以重新執(zhí)行,消息會丟失,使用非常簡單,go-queue中使用了redis setnx保證了每個消息只被消費(fèi)一次,使用場景主要是用來做日常任務(wù)使用

  • kq:依賴于 kafka ,這個就不多介紹啦,大名鼎鼎的 kafka ,使用場景主要是做日志用

我們主要說一下dq,kq使用也一樣的,只是依賴底層不同,如果沒使用過beanstalkd,沒接觸過beanstalkd的可以先google一下,使用起來還是挺容易的。

我在jobs下使用goctl新建了一個message-job.api服務(wù)

info(
	title: //消息任務(wù)
	desc: // 消息任務(wù)
	author: "Mikael"
	email: "13247629622@163.com"
)

type BatchSendMessageReq {}

type BatchSendMessageResp {}

service message-job-api {
	@handler batchSendMessageHandler // 批量發(fā)送短信
	post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
}

因為不需要使用路由,所以handler下的routes.go被我刪除了,在handler下新建了一個jobRun.go,內(nèi)容如下:

package handler

import (
	"fishtwo/lib/xgo"
	"fishtwo/app/jobs/message/internal/svc"
)


/**
* @Description 啟動job
* @Author Mikael
* @Date 2021/1/18 12:05
* @Version 1.0
**/

func JobRun(serverCtx *svc.ServiceContext)  {
	xgo.Go(func() {
		batchSendMessageHandler(serverCtx)
    //...many job
	})
}

其實xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封裝了一下go攜程,防止野生goroutine panic

然后修改一下啟動文件message-job.go

package main

import (
   "flag"
   "fmt"

   "fishtwo/app/jobs/message/internal/config"
   "fishtwo/app/jobs/message/internal/handler"
   "fishtwo/app/jobs/message/internal/svc"

   "github.com/tal-tech/go-zero/core/conf"
   "github.com/tal-tech/go-zero/rest"
)

var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")

func main() {
   flag.Parse()

   var c config.Config
   conf.MustLoad(*configFile, &c)

   ctx := svc.NewServiceContext(c)
   server := rest.MustNewServer(c.RestConf)
   defer server.Stop()

   handler.JobRun(ctx)

   fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
   server.Start()
}

主要是handler.RegisterHandlers(server, ctx) 修改為handler.JobRun(ctx)

接下來,我們就可以引入dq了,首先在etc/xxx.yaml下添加dqConf

.....

DqConf:
  Beanstalks:
    - Endpoint: 127.0.0.1:7771
      Tube: tube1
    - Endpoint: 127.0.0.1:7772
      Tube: tube2
  Redis:
    Host: 127.0.0.1:6379
    Type: node

我這里本地用不同端口,模擬開了2個節(jié)點(diǎn),7771、7772

在internal/config/config.go添加配置解析對象

type Config struct {
	....
	DqConf dq.DqConf
}

修改handler/batchsendmessagehandler.go

package handler

import (
	"context"
	"fishtwo/app/jobs/message/internal/logic"
	"fishtwo/app/jobs/message/internal/svc"
	"github.com/tal-tech/go-zero/core/logx"
)

func batchSendMessageHandler(ctx *svc.ServiceContext){
	rootCxt:= context.Background()
	l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
	err := l.BatchSendMessage()
	if err != nil{
		logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)
	}
}

修改logic下batchsendmessagelogic.go,寫我們的consumer消費(fèi)邏輯

package logic

import (
   "context"
   "fishtwo/app/jobs/message/internal/svc"
   "fmt"
   "github.com/tal-tech/go-zero/core/logx"
)

type BatchSendMessageLogic struct {
   logx.Logger
   ctx    context.Context
   svcCtx *svc.ServiceContext
}

func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
   return BatchSendMessageLogic{
   	Logger: logx.WithContext(ctx),
   	ctx:    ctx,
   	svcCtx: svcCtx,
   }
}


func (l *BatchSendMessageLogic) BatchSendMessage() error {
   fmt.Println("job BatchSendMessage start")

   l.svcCtx.Consumer.Consume(func(body []byte) {
   	fmt.Printf("job BatchSendMessage %s \n" + string(body))
   })

   fmt.Printf("job BatchSendMessage finish \n")
   return nil
}

這樣就大功告成了,啟動message-job.go就ok課

go run message-job.go

之后我們就可以在業(yè)務(wù)代碼中向dq添加任務(wù),它就可以自動消費(fèi)了

producer.Delay 向dq中投遞5個延遲任務(wù):

	producer := dq.NewProducer([]dq.Beanstalk{
		{
			Endpoint: "localhost:7771",
			Tube:     "tube1",
		},
		{
			Endpoint: "localhost:7772",
			Tube:     "tube2",
		},
	})

	for i := 1000; i < 1005; i++ {
		_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
		if err != nil {
			fmt.Println(err)
		}
	}

producer.At 可以指定某個時間執(zhí)行,非常好用,感興趣的朋友自己可以研究下。

錯誤日志

在前面說到gateway改造時候,如果眼神好的童鞋,在上面的httpresult.go中已經(jīng)看到了log的身影:

如何接入異步任務(wù)及使用log

我們在來看下rpc中怎么處理的

如何接入異步任務(wù)及使用log

是的,我在每個rpc啟動的main中加入了grpc攔截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那讓我們看看grpc攔截器里面做了什么

如何接入異步任務(wù)及使用log

然后我代碼里面使用github/pkg/errors這個包去處理錯誤的,這個包還是很好用的

如何接入異步任務(wù)及使用log

如何接入異步任務(wù)及使用log

所以呢:

我們在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err);

api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)

go-zero 中打印日志,使用logx.WithContext會把trace-id帶入,這樣一個請求下來,比如

user-api --> user-srv --> message-srv

那如果 messsage-srv 出錯,他們?nèi)齻€是同一個 trace-id ,是不是就可以在elk通過輸入這個trace-id一次性搜索出來這條請求報錯堆棧信息呢?當(dāng)然你也可以接入 jaeger、zipkin、skywalking 等,這個我暫時還沒接入。

感謝各位的閱讀,以上就是“如何接入異步任務(wù)及使用log”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何接入異步任務(wù)及使用log這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

go
AI