溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

golang如何實現(xiàn)rabbitmq監(jiān)聽

發(fā)布時間:2023-05-10 11:30:34 來源:億速云 閱讀:106 作者:zzz 欄目:編程語言

本篇內容介紹了“golang如何實現(xiàn)rabbitmq監(jiān)聽”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

準備工作

在開始之前,需要確保已經(jīng)安裝RabbitMQ。由于RabbitMQ依賴Erlang,所以還需要安裝Erlang。

安裝完成之后,我們需要安裝Golang第三方包。其中,AMQP包是必不可少的,它可以讓我們很方便地連接和操作RabbitMQ。

go get github.com/streadway/amqp

代碼實現(xiàn)

首先,我們需要連接到RabbitMQ。連接成功后,我們需要聲明一個名為“test”、類型為“fanout”的exchange。exchange是RabbitMQ中實現(xiàn)消息路由的重要組成部分,它負責接收消息并將它們分發(fā)給隊列。在這種情況下,我們將聲明一個名為“test”的exchange,并將其類型設置為“fanout”,這意味著它將消息廣播給所有訂閱了它的隊列。

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(

"test",   // name
"fanout", // type
true,     // durable
false,    // auto-deleted
false,    // internal
false,    // no-wait
nil,      // arguments

)
failOnError(err, "Failed to declare an exchange")

接下來,我們需要創(chuàng)建一個新的、非持久的、具有自動生成名稱的隊列。在這里,我們將使用隊列的名稱來綁定它們與剛剛聲明的“test”exchange。

q, err := ch.QueueDeclare(

"",    // name
false, // durable
false, // delete when unused
true,  // exclusive
false, // no-wait
nil,   // arguments

)
failOnError(err, "Failed to declare a queue")

err = ch.QueueBind(

q.Name, // queue name
"",     // routing key
"test", // exchange
false,
nil,

)
failOnError(err, "Failed to bind a queue")

現(xiàn)在,RabbitMQ已經(jīng)準備就緒,我們可以開始監(jiān)聽它的消息了。我們可以使用Consume函數(shù)來實現(xiàn)消息監(jiān)聽,它可以使我們持續(xù)不斷地接收來自隊列的消息,并對它們進行處理。

msgs, err := ch.Consume(

q.Name, // queue name
"",     // consumer
true,   // auto-ack
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args

)
failOnError(err, "Failed to register a consumer")

for msg := range msgs {

log.Printf("Received a message: %s", msg.Body)

}

在以上代碼中,我們使用了ch.Consume()方法來監(jiān)聽指定隊列中的消息,并通過打印日志的方式輸出了消息內容。需要注意的是,我們使用了一個死循環(huán)來部署進行消息監(jiān)聽,這意味著我們會一直監(jiān)聽隊列,直至程序被停止或者出現(xiàn)錯誤。

完整代碼如下:

package main

import (

"log"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {

if err != nil {
    log.Fatalf("%s: %s", msg, err)
}

}

func main() {

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
    "test",   // name
    "fanout", // type
    true,     // durable
    false,    // auto-deleted
    false,    // internal
    false,    // no-wait
    nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

err = ch.QueueBind(
    q.Name, // queue name
    "",     // routing key
    "test", // exchange
    false,
    nil,
)
failOnError(err, "Failed to bind a queue")

msgs, err := ch.Consume(
    q.Name, // queue name
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)
failOnError(err, "Failed to register a consumer")

for msg := range msgs {
    log.Printf("Received a message: %s", msg.Body)
}

}

“golang如何實現(xiàn)rabbitmq監(jiān)聽”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI