溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

???????Golang實現RabbitMQ中死信隊列的情況有哪些

發(fā)布時間:2023-03-01 11:17:27 來源:億速云 閱讀:95 作者:iii 欄目:開發(fā)技術

這篇文章主要講解了“Golang實現RabbitMQ中死信隊列的情況有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Golang實現RabbitMQ中死信隊列的情況有哪些”吧!

1、造成死信隊列的主要原因

  • 消費者超時未應答

  • 隊列的容量有限

  • 消費者拒絕了的消息

2、操作邏輯圖

???????Golang實現RabbitMQ中死信隊列的情況有哪些

3、代碼實戰(zhàn)

其實整體的思路就是分別創(chuàng)建一個normal_exchange、dead_exchange、normal_queue、dead_queue,然后將normal_exchange與normal_queue進行綁定,將dead_exchange與dead_queue進行綁定,這里比較關鍵的一個點在于說如何將normal_queue與dead_exchange進行綁定,這樣才能將錯誤的消息傳遞過來。下面就是這段代碼的關鍵。

// 聲明一個normal隊列
    _, err = ch.QueueDeclare(
        constant.NormalQueue,
        true,
        false,
        false,
        false,
        amqp.Table{
            //"x-message-ttl":             5000,                    // 指定過期時間
            //"x-max-length":              6,                        // 指定長度。超過這個長度的消息會發(fā)送到dead_exchange中
            "x-dead-letter-exchange":    constant.DeadExchange,    // 指定死信交換機
            "x-dead-letter-routing-key": constant.DeadRoutingKey,  // 指定死信routing-key
        })

3.1 針對原因1:消費者超出時間未應答

consumer1.go

package day07

import (
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
    "v1/utils"
)

type Constant struct {
    NormalExchange   string
    DeadExchange     string
    NormalQueue      string
    DeadQueue        string
    NormalRoutingKey string
    DeadRoutingKey   string
}

func Consumer1() {
    // 獲取連接
    ch := utils.GetChannel()
    // 創(chuàng)建一個變量常量
    constant := Constant{
        NormalExchange:   "normal_exchange",
        DeadExchange:     "dead_exchange",
        NormalQueue:      "normal_queue",
        DeadQueue:        "dead_queue",
        NormalRoutingKey: "normal_key",
        DeadRoutingKey:   "dead_key",
    }
    // 聲明normal交換機
    err := ch.ExchangeDeclare(
        constant.NormalExchange,
        amqp.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil,
    )
    utils.FailOnError(err, "Failed to declare a normal exchange")
    // 聲明一個dead交換機
    err = ch.ExchangeDeclare(
        constant.DeadExchange,
        amqp.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil,
    )
    utils.FailOnError(err, "Failed to declare a dead exchange")

    // 聲明一個normal隊列
    _, err = ch.QueueDeclare(
        constant.NormalQueue,
        true,
        false,
        false,
        false,
        amqp.Table{
            "x-message-ttl": 5000, // 指定過期時間
            //"x-max-length":              6,
            "x-dead-letter-exchange":    constant.DeadExchange,   // 指定死信交換機
            "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
        })
    utils.FailOnError(err, "Failed to declare a normal queue")
    // 聲明一個dead隊列:注意不要給死信隊列設置消息時間,否者死信隊列里面的信息會再次過期
    _, err = ch.QueueDeclare(
        constant.DeadQueue,
        true,
        false,
        false,
        false,
        nil)
    utils.FailOnError(err, "Failed to declare a dead queue")

    // 將normal_exchange與normal_queue進行綁定
    err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)
    utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")
    // 將dead_exchange與dead_queue進行綁定
    err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)
    utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")

    // 消費消息
    msgs, err := ch.Consume(constant.NormalQueue,
        "",
        false, // 這個地方一定要關閉自動應答
        false,
        false,
        false,
        nil)
    utils.FailOnError(err, "Failed to consume in Consumer1")

    var forever chan struct{}

    go func() {
        for d := range msgs {
            if err := d.Reject(false); err != nil {
                utils.FailOnError(err, "Failed to Reject a message")
            }
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

    <-forever
}

consumer2.go

package day07

import (
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
    "v1/utils"
)

func Consumer2() {
    // 拿取信道
    ch := utils.GetChannel()

    // 聲明一個交換機
    err := ch.ExchangeDeclare(
        "dead_exchange",
        amqp.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil)
    utils.FailOnError(err, "Failed to Declare a exchange")

    // 接收消息的應答
    msgs, err := ch.Consume("dead_queue",
        "",
        false,
        false,
        false,
        false,
        nil,
    )

    var forever chan struct{}
    go func() {
        for d := range msgs {
            log.Printf("[x] %s", d.Body)
            // 開啟手動應答?
            d.Ack(false)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever

}

produce.go

package day07

import (
    "context"
    amqp "github.com/rabbitmq/amqp091-go"
    "strconv"
    "time"
    "v1/utils"
)

func Produce() {
    // 獲取信道
    ch := utils.GetChannel()
    // 聲明一個交換機
    err := ch.ExchangeDeclare(
        "normal_exchange",
        amqp.ExchangeDirect,
        true,
        false,
        false,
        false,
        nil)
    utils.FailOnError(err, "Failed to declare a exchange")
    ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancer()

    // 發(fā)送了10條消息
    for i := 0; i < 10; i++ {
        msg := "Info:" + strconv.Itoa(i)
        ch.PublishWithContext(ctx,
            "normal_exchange",
            "normal_key",
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msg),
            })
    }
}

3.2 針對原因2:限制一定的長度

只需要改變consumer1.go中的對normal_queue的聲明

// 聲明一個normal隊列
    _, err = ch.QueueDeclare(
        constant.NormalQueue,
        true,
        false,
        false,
        false,
        amqp.Table{
            //"x-message-ttl": 5000, // 指定過期時間
            "x-max-length":              6,
            "x-dead-letter-exchange":    constant.DeadExchange,   // 指定死信交換機
            "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
        })

3.3 針對原因3:消費者拒絕的消息回到死信隊列中

這里需要完成兩點工作

工作1:需要在consumer1中作出拒絕的操作

go func() {
        for d := range msgs {
            if err := d.Reject(false); err != nil {
                utils.FailOnError(err, "Failed to Reject a message")
            }
        }
    }()

工作2:如果你consume的時候開啟了自動應答一定要關閉

// 消費消息
    msgs, err := ch.Consume(constant.NormalQueue,
        "",
        false, // 這個地方一定要關閉自動應答
        false,
        false,
        false,
        nil)

其他的部分不需要改變,按照問題1中的設計即可。

感謝各位的閱讀,以上就是“Golang實現RabbitMQ中死信隊列的情況有哪些”的內容了,經過本文的學習后,相信大家對Golang實現RabbitMQ中死信隊列的情況有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI