您好,登錄后才能下訂單哦!
這篇文章主要講解了“Golang實現RabbitMQ中死信隊列的情況有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Golang實現RabbitMQ中死信隊列的情況有哪些”吧!
消費者超時未應答
隊列的容量有限
消費者拒絕了的消息
其實整體的思路就是分別創(chuàng)建一個normal_exchange、dead_exchange、normal_queue、dead_queue,然后將normal_exchange與normal_queue進行綁定,將dead_exchange與dead_queue進行綁定,這里比較關鍵的一個點在于說如何將normal_queue與dead_exchange進行綁定,這樣才能將錯誤的消息傳遞過來。下面就是這段代碼的關鍵。
// 聲明一個normal隊列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定過期時間 //"x-max-length": 6, // 指定長度。超過這個長度的消息會發(fā)送到dead_exchange中 "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
consumer1.go
package day07 import ( amqp "github.com/rabbitmq/amqp091-go" "log" "v1/utils" ) type Constant struct { NormalExchange string DeadExchange string NormalQueue string DeadQueue string NormalRoutingKey string DeadRoutingKey string } func Consumer1() { // 獲取連接 ch := utils.GetChannel() // 創(chuàng)建一個變量常量 constant := Constant{ NormalExchange: "normal_exchange", DeadExchange: "dead_exchange", NormalQueue: "normal_queue", DeadQueue: "dead_queue", NormalRoutingKey: "normal_key", DeadRoutingKey: "dead_key", } // 聲明normal交換機 err := ch.ExchangeDeclare( constant.NormalExchange, amqp.ExchangeDirect, true, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a normal exchange") // 聲明一個dead交換機 err = ch.ExchangeDeclare( constant.DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a dead exchange") // 聲明一個normal隊列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ "x-message-ttl": 5000, // 指定過期時間 //"x-max-length": 6, "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key }) utils.FailOnError(err, "Failed to declare a normal queue") // 聲明一個dead隊列:注意不要給死信隊列設置消息時間,否者死信隊列里面的信息會再次過期 _, err = ch.QueueDeclare( constant.DeadQueue, true, false, false, false, nil) utils.FailOnError(err, "Failed to declare a dead queue") // 將normal_exchange與normal_queue進行綁定 err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil) utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue") // 將dead_exchange與dead_queue進行綁定 err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil) utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue") // 消費消息 msgs, err := ch.Consume(constant.NormalQueue, "", false, // 這個地方一定要關閉自動應答 false, false, false, nil) utils.FailOnError(err, "Failed to consume in Consumer1") var forever chan struct{} go func() { for d := range msgs { if err := d.Reject(false); err != nil { utils.FailOnError(err, "Failed to Reject a message") } } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
consumer2.go
package day07 import ( amqp "github.com/rabbitmq/amqp091-go" "log" "v1/utils" ) func Consumer2() { // 拿取信道 ch := utils.GetChannel() // 聲明一個交換機 err := ch.ExchangeDeclare( "dead_exchange", amqp.ExchangeDirect, true, false, false, false, nil) utils.FailOnError(err, "Failed to Declare a exchange") // 接收消息的應答 msgs, err := ch.Consume("dead_queue", "", false, false, false, false, nil, ) var forever chan struct{} go func() { for d := range msgs { log.Printf("[x] %s", d.Body) // 開啟手動應答? d.Ack(false) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
produce.go
package day07 import ( "context" amqp "github.com/rabbitmq/amqp091-go" "strconv" "time" "v1/utils" ) func Produce() { // 獲取信道 ch := utils.GetChannel() // 聲明一個交換機 err := ch.ExchangeDeclare( "normal_exchange", amqp.ExchangeDirect, true, false, false, false, nil) utils.FailOnError(err, "Failed to declare a exchange") ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second) defer cancer() // 發(fā)送了10條消息 for i := 0; i < 10; i++ { msg := "Info:" + strconv.Itoa(i) ch.PublishWithContext(ctx, "normal_exchange", "normal_key", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }) } }
只需要改變consumer1.go中的對normal_queue的聲明
// 聲明一個normal隊列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定過期時間 "x-max-length": 6, "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
這里需要完成兩點工作
工作1:需要在consumer1中作出拒絕的操作
go func() { for d := range msgs { if err := d.Reject(false); err != nil { utils.FailOnError(err, "Failed to Reject a message") } } }()
工作2:如果你consume的時候開啟了自動應答一定要關閉
// 消費消息 msgs, err := ch.Consume(constant.NormalQueue, "", false, // 這個地方一定要關閉自動應答 false, false, false, nil)
其他的部分不需要改變,按照問題1中的設計即可。
感謝各位的閱讀,以上就是“Golang實現RabbitMQ中死信隊列的情況有哪些”的內容了,經過本文的學習后,相信大家對Golang實現RabbitMQ中死信隊列的情況有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。