溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

rabbitmq的事務(wù)機(jī)制

發(fā)布時(shí)間:2021-09-01 17:54:49 來(lái)源:億速云 閱讀:198 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要介紹“rabbitmq的事務(wù)機(jī)制”,在日常操作中,相信很多人在rabbitmq的事務(wù)機(jī)制問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”rabbitmq的事務(wù)機(jī)制”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

rabbitmq事務(wù)機(jī)制:

    1:通過(guò)事務(wù)機(jī)制實(shí)現(xiàn)

         1:channel.txSelect()聲明啟動(dòng)事務(wù)模式;

         2 :  channel.txComment()提交事務(wù);

         3:channel.txRollback()回滾事務(wù);

try {
    channel.txSelect(); // 聲明事務(wù)
    // 發(fā)送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事務(wù)
} catch (Exception e) {
    channel.txRollback();
} finally {
    channel.close();
    conn.close();
}

    2:通過(guò)發(fā)送方確認(rèn)   publisher   confirm   機(jī)制實(shí)現(xiàn)。

Confirm發(fā)送方確認(rèn)模式使用和事務(wù)類似,也是通過(guò)設(shè)置Channel進(jìn)行發(fā)送方確認(rèn)的。

        Confirm的三種實(shí)現(xiàn)方式:

        方式一:channel.waitForConfirms()普通發(fā)送方確認(rèn)模式;

        方式二:channel.waitForConfirmsOrDie()批量確認(rèn)模式;

        方式三:channel.addConfirmListener()異步監(jiān)聽發(fā)送方確認(rèn)模式;

        方式一:普通Confirm模式

// 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(config.UserName);
        factory.setPassword(config.Password);
        factory.setVirtualHost(config.VHost);
        factory.setHost(config.Host);
        factory.setPort(config.Port);
        Connection conn = factory.newConnection();
// 創(chuàng)建信道
        Channel channel = conn.createChannel();
// 聲明隊(duì)列
        channel.queueDeclare(config.QueueName, false, false, false, null);
// 開啟發(fā)送方確認(rèn)模式
        channel.confirmSelect();
        String message = String.format("時(shí)間 => %s", new Date().getTime());
        channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
        if (channel.waitForConfirms()) {
            System.out.println("消息發(fā)送成功" );
        }
        看代碼可以知道,我們只需要在推送消息之前,channel.confirmSelect()聲明開啟發(fā)送方確認(rèn)模式,再使用channel.waitForConfirms()等待消息被服務(wù)器確認(rèn)即可。

        方式二:批量Confirm模式

// 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(config.UserName);
        factory.setPassword(config.Password);
        factory.setVirtualHost(config.VHost);
        factory.setHost(config.Host);
        factory.setPort(config.Port);
        Connection conn = factory.newConnection();
// 創(chuàng)建信道
        Channel channel = conn.createChannel();
// 聲明隊(duì)列
        channel.queueDeclare(config.QueueName, false, false, false, null);
// 開啟發(fā)送方確認(rèn)模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            String message = String.format("時(shí)間 => %s", new Date().getTime());
            channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
        }
        channel.waitForConfirmsOrDie(); //直到所有信息都發(fā)布,只要有一個(gè)未確認(rèn)就會(huì)IOException
        System.out.println("全部執(zhí)行完成");
        以上代碼可以看出來(lái)channel.waitForConfirmsOrDie(),使用同步方式等所有的消息發(fā)送之后才會(huì)執(zhí)行后面代碼,只要有一個(gè)消息未被確認(rèn)就會(huì)拋出IOException異常。

        方式三:異步Confirm模式

// 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(config.UserName);
        factory.setPassword(config.Password);
        factory.setVirtualHost(config.VHost);
        factory.setHost(config.Host);
        factory.setPort(config.Port);
        Connection conn = factory.newConnection();
// 創(chuàng)建信道
        Channel channel = conn.createChannel();
// 聲明隊(duì)列
        channel.queueDeclare(config.QueueName, false, false, false, null);
// 開啟發(fā)送方確認(rèn)模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            String message = String.format("時(shí)間 => %s", new Date().getTime());
            channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
        }
//異步監(jiān)聽確認(rèn)和未確認(rèn)的消息
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("未確認(rèn)消息,標(biāo)識(shí):" + deliveryTag);
            }
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("已確認(rèn)消息,標(biāo)識(shí):%d,多個(gè)消息:%b", deliveryTag, multiple));
            }
        });

rabbitmq 消息分發(fā)

RabbitMQ 隊(duì)列擁有多個(gè)消費(fèi)者時(shí) ,隊(duì)列收到的消息將以輪詢 (round-robin )的分發(fā)方發(fā)送給消費(fèi)者。每條消息只會(huì)發(fā)送給訂閱列表里的一個(gè)消費(fèi)者。這種方式非常適合擴(kuò)展,而它是專門為并發(fā)程序設(shè)計(jì)的。如果現(xiàn)在負(fù)載加重,那么只需要?jiǎng)?chuàng)建更多的消費(fèi)者來(lái)消費(fèi)處理消息即可。
        很多時(shí)候輪詢的分發(fā)機(jī)制也不是那么優(yōu)雅。默認(rèn)情況下,如果有 個(gè)消費(fèi)者,那么 RabbitM會(huì)將第 條消息分發(fā)給第 m%n (取余的方式)個(gè)消費(fèi)者, RabbitMQ 不管消費(fèi)者是否消費(fèi)并己
        經(jīng)確認(rèn) (Basic.Ack) 了消息。試想一下,如果某些消費(fèi)者任務(wù)繁重,來(lái)不及消費(fèi)那么多的消
        息,而某些其他消費(fèi)者由于某些原因(比如業(yè)務(wù)邏輯簡(jiǎn)單、機(jī)器性能卓越等)很快地處理完了
        所分配到的消息,進(jìn)而進(jìn)程空閑,這樣就會(huì)造成整體應(yīng)用吞吐量的下降。
        那么該如何處理這種情況呢?這里就要用到 channel.basicQos(int prefetchCoun這個(gè)方法,如前面章節(jié)所述, channel.basicQos 方法允許限制信道上的消費(fèi)者所能保持的最未確認(rèn)消息的數(shù)量。
        舉例說(shuō)明,在訂閱消費(fèi)隊(duì)列之前,消費(fèi)端程序調(diào)用了 channel.basicQos(5) ,之后閱了某個(gè)隊(duì)列進(jìn)行消費(fèi)。 RabbitM 會(huì)保存一個(gè)消費(fèi)者的列表,每發(fā)送一條消息都會(huì)為對(duì)應(yīng)的費(fèi)者計(jì)數(shù),如果達(dá)到了所設(shè)定的上限,那么 RabbitMQ 就不會(huì)向這個(gè)消費(fèi)者再發(fā)送任何消息。
        直到消費(fèi)者確認(rèn)了某條消息之后 RabbitMQ 將相應(yīng)的計(jì)數(shù)減1,之后消費(fèi)者可以繼續(xù)接收消息,
        直到再次到達(dá)計(jì)數(shù)上限。這種機(jī)制可以類比于 TCP!IP中的"滑動(dòng)窗口"
        注意要點(diǎn):
        Basic.Qos 的使用對(duì)于拉模式的消費(fèi)方式無(wú)效.
        channel.basicQos 有三種類型的重載方法:
        (1) void basicQos(int prefetchCount) throws IOException;
        (2) void basicQos( nt prefetchCount , boo1ean globa1) throws IOExcepti(3) void basicQos(int prefetchSize , int prefetchCount , boo1ean global) IOException ;
        前面介紹的都只用到了 prefetchCount 這個(gè)參數(shù),當(dāng) prefetchCount 設(shè)置 沒(méi)有上限。還有 prefetchSize 這個(gè)參數(shù)表示消費(fèi)者所能接收未確認(rèn)消息的總體大小的上單位為 ,設(shè)置為 則表示沒(méi)有上限。
        對(duì)于 個(gè)信道來(lái)說(shuō),它可以同時(shí)消費(fèi)多個(gè)隊(duì)列,當(dāng)設(shè)置了 prefetchCount 大于 時(shí)信道需要和各個(gè)隊(duì)列協(xié)調(diào)以確保發(fā)送的消息都沒(méi)有超過(guò)所限定的 prefetchCount 的值,RabbitM 的性能降低,尤其是這些隊(duì)列分散在集群中的多個(gè) Broker 節(jié)點(diǎn)之中。Rabbit提升相關(guān)的性能,在 AMQPO-9-1 協(xié)議之上重新定義了 global 這個(gè)參數(shù),對(duì)比如表 4- 所4-1 global 參數(shù)的對(duì)比
        global 參數(shù)
        false:信道上新的消費(fèi)者需要遵從 prefetchCount 的限定值true:信道上所有的消費(fèi)者都需要遵從 prefetchCount的限定值

rabbitmq 消息順序消費(fèi):

        rabbitmq重發(fā),死信隊(duì)列,延時(shí),及網(wǎng)絡(luò)閃斷都會(huì)造成生產(chǎn)順序亂序,故不支持順序消費(fèi)。

        可以在業(yè)務(wù)上自定義排序值,當(dāng)接受到的排序值與下一個(gè)預(yù)估要消費(fèi)的值不一致是,就等待。

rabbitmq 消息可靠性保障:

消息可靠傳輸一般是業(yè)務(wù)系統(tǒng)接入消息中間件時(shí)首要考慮的問(wèn)題,一般消息中間件的消傳輸保障分為三個(gè)層級(jí)。
{> At most once: 最多一次。消息可能會(huì)丟失,但絕不會(huì)重復(fù)傳輸
    At least once: 最少一次。消息絕不會(huì)丟失,但可能會(huì)重復(fù)傳輸。
    Exactly once: 恰好一次。每條消息肯定會(huì)被傳輸一次且僅傳輸一次。
    RabbitMQ 支持其中的"最多一次 "和"最少一次"。其中"最少 次"投遞實(shí)現(xiàn)需要考以下這個(gè)幾個(gè)方面的內(nèi)容:
(1)消息生產(chǎn)者需要開啟事務(wù)機(jī)制或者 publisher confirm 機(jī)制,以確保消 息可以可靠地輸?shù)?nbsp;RabbitMQ 中。
    (2) 消息生產(chǎn)者需要配合使用 mandatory 參數(shù)或者備份交換器來(lái)確保消息能夠從交換路由到隊(duì)列中,進(jìn)而能夠保存下來(lái)而不會(huì)被丟棄。
    3) 消息和隊(duì)列都需要進(jìn)行持久化處理,以確保 RabbitMQ 服務(wù)器在遇到異常情況時(shí)不? 90 ? Rabbi{MQ 進(jìn)階
    造成消息丟失
            (4) 消費(fèi)者在消費(fèi)消息的同時(shí)需要將 autoAck 設(shè)置為 false ,然后通過(guò)手動(dòng)確認(rèn)的方式去
    確認(rèn)己經(jīng)正確消費(fèi)的消息,以避免在消費(fèi)端引起不必要的消息丟失。
    "最多 次"的方式就無(wú)須考慮以上那些方面,生產(chǎn)者隨意發(fā)送,消費(fèi)者隨意消費(fèi),不過(guò)這
            樣很難確保消息不會(huì)丟失
    "恰好 次"是 RabbitMQ 目前無(wú)法保障的??紤]這樣一種情況,消費(fèi)者在消費(fèi)完一條消息
    之后向 RabbitMQ 發(fā)送確認(rèn) Basic.Ack 命令,此時(shí)由于網(wǎng)絡(luò)斷開或者其他原因造成 RabbitMQ
    并沒(méi)有收到這個(gè)確認(rèn)命令,那么 RabbitMQ 不會(huì)將此條消息標(biāo)記刪除。在重新建立連接之后,
    消費(fèi)者還是會(huì)消費(fèi)到這 條消息,這就造成了重復(fù)消費(fèi)。再考慮 種情況,生產(chǎn)者在使用
    ublisher confirm 機(jī)制的時(shí)候,發(fā)送完 條消息等待 RabbitMQ 返回確認(rèn)通知,此時(shí)網(wǎng)絡(luò)斷開,
    生產(chǎn)者捕獲到異常情況,為了確保消息可靠性選擇重新發(fā)送,這樣 RabbitMQ 中就有兩條同樣
    的消息,在消費(fèi)的時(shí)候,消費(fèi)者就會(huì)重復(fù)消費(fèi)
    那么 RabbitMQ 有沒(méi)有去重的機(jī)制來(lái)保證"恰好一次"呢?答案是并沒(méi)有,不僅是 RabbitMQ
    目前大多數(shù) 流的消息中間件都沒(méi)有消息去重機(jī)制,也不保障"恰好 次"。去重處理 般是在
    業(yè)務(wù)客戶端實(shí)現(xiàn),比如引入 GUID (Globally Unique Identifier) 的概念。針對(duì) GUID ,如果從客
    戶端的角度去 ,那么 要引入集中式緩存,必然會(huì)增加依賴復(fù)雜度,另外緩存的大小也難以
    界定 建議在實(shí)際生產(chǎn)環(huán)境中,業(yè)務(wù)方根據(jù)自身的業(yè)務(wù)特性進(jìn)行去重,比如業(yè)務(wù)消息本身具備
    等'性,或者借助 Redis 等其他產(chǎn)品進(jìn)行去重處理。

到此,關(guān)于“rabbitmq的事務(wù)機(jī)制”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI