您好,登錄后才能下訂單哦!
本篇文章為大家展示了在Rabbitmq中利用heartbea實(shí)現(xiàn)心跳檢測(cè)機(jī)制的原理是什么,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
前言
使用rabbitmq的時(shí)候,當(dāng)你客戶(hù)端與rabbitmq服務(wù)器之間一段時(shí)間沒(méi)有流量,服務(wù)器將會(huì)斷開(kāi)與客戶(hù)端之間tcp連接。
而你將在服務(wù)器上看這樣的日志:
missed heartbeats from client, timeout: xxs
這個(gè)間隔時(shí)間就是心跳間隔。
heartbeat通常用來(lái)檢測(cè)通信的對(duì)端是否存活(未正常關(guān)閉socket連接而異常crash)。其基本原理是檢測(cè)對(duì)應(yīng)的socket連接上數(shù)據(jù)的收發(fā)是否正常,如果一段時(shí)間內(nèi)沒(méi)有收發(fā)數(shù)據(jù),則向?qū)Χ税l(fā)送一個(gè)心跳檢測(cè)包,如果一段時(shí)間內(nèi)沒(méi)有回應(yīng)則認(rèn)為心跳超時(shí),即認(rèn)為對(duì)端可能異常crash了。
rabbitmq也不例外,heatbeat在客戶(hù)端和服務(wù)端之間用于檢測(cè)對(duì)端是否正常,即客戶(hù)端與服務(wù)端之間的tcp鏈接是否正常。
關(guān)于rabbitmq心跳
1.heartbeat檢測(cè)時(shí)間間隔可在配置文件rabbitmq.config中增加配置項(xiàng){heartbeat,Timeout}進(jìn)行配置,其中Timeout指定時(shí)間間隔,單位為秒,另外客戶(hù)端也可以配置heartbeat時(shí)間。
如果服務(wù)端沒(méi)有配置
默認(rèn)代理心跳時(shí)間:
RabbitMQ 3.2.2:580秒
RabbitMQ 3.5.5:60秒
2.官方建議不要禁用心跳,且建議心跳時(shí)間為60秒。
3.心跳每 heartbeat timeout / 2 秒發(fā)送一次,服務(wù)器兩次沒(méi)有接收到則斷開(kāi)tcp連接,以前的連接將失效,客戶(hù)端需要重新連接。
4.如果你使用Java, .NET and Erlang clients,服務(wù)器與客戶(hù)端會(huì)協(xié)商heartbeat時(shí)間
如果其中一個(gè)值為0,則使用兩者中較大的一個(gè)
否則,使用兩者中較小的一個(gè)
兩個(gè)值都為0,則表示要禁用心跳,則服務(wù)端與客戶(hù)端維持此tcp連接,不會(huì)斷開(kāi)。
注意:在python客戶(hù)端上直接設(shè)置為0,則禁用心跳。
禁用心跳在python客戶(hù)端該如何設(shè)置:
在py3:ConnectionParameters設(shè)置heartbeat_interval=0即可。
在py2:ConnectionParameters設(shè)置heartbeat=0即可。
5.連接上的任何流量(傳輸?shù)挠行?shù)據(jù)、確認(rèn)等)都將被計(jì)入有效心跳,當(dāng)然也包括心跳幀。
6.我在網(wǎng)上看到有人問(wèn)到這個(gè)問(wèn)題:
為什么服務(wù)端宕機(jī),在心跳檢測(cè)機(jī)制下,服務(wù)器側(cè)斷開(kāi)連接,而客戶(hù)端這邊不能檢測(cè)到tcp斷開(kāi),我測(cè)試過(guò),客戶(hù)端確實(shí)不能檢測(cè)到tcp連接斷開(kāi),只有當(dāng)客戶(hù)端在這個(gè)tcp有操作后,才能檢測(cè)到,當(dāng)然在一個(gè)斷開(kāi)的tcp連接上做操作會(huì)報(bào)錯(cuò)(如發(fā)送消息)。
import pika import time credit = pika.PlainCredentials(username='cloud', password='cloud') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.32.1.12', credentials=credit)) channel = connection.channel() while True: connect_close = connection.is_closed connect_open = connection.is_open channel_close = channel.is_closed channel_open = channel.is_open print("connection is_closed ", connect_close) print("connection is_open ", connect_open) print("channel is_closed ", channel_close) print("channel is_open ", channel_open) print("") time.sleep(5)
7.一些RabbitMQ客戶(hù)端(Bunny,Java,.NET,Objective-C,Swift)提供了一種在網(wǎng)絡(luò)故障后自動(dòng)恢復(fù)連接的機(jī)制,而pika只能通過(guò)檢測(cè)連接異常后再重新創(chuàng)建連接的方式。
示例代碼:通過(guò)檢測(cè)連接異常,重新創(chuàng)建連接:
import pika while True: try: connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume('test', on_message_callback) channel.start_consuming() # Don't recover if connection was closed by broker except pika.exceptions.ConnectionClosedByBroker: break # Don't recover on channel errors except pika.exceptions.AMQPChannelError: break # Recover on all other connection errors except pika.exceptions.AMQPConnectionError: continue
你也可以使用操作重試庫(kù),例如 retry。
from retry import retry @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) def consume(): connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume('test', on_message_callback) try: channel.start_consuming() # Don't recover connections closed by server except pika.exceptions.ConnectionClosedByBroker: pass consume()
heartbeat的實(shí)現(xiàn)
rabbitmq在收到來(lái)自客戶(hù)端的connection.tune-ok信令后,啟用心跳檢測(cè),rabbitmq會(huì)為每個(gè)tcp連接創(chuàng)建兩個(gè)進(jìn)程用于心跳檢測(cè),一個(gè)進(jìn)程定時(shí)檢測(cè)tcp連接上是否有數(shù)據(jù)發(fā)送(這里的發(fā)送是指rabbitmq發(fā)送數(shù)據(jù)給客戶(hù)端),如果一段時(shí)間內(nèi)沒(méi)有數(shù)據(jù)發(fā)送給客戶(hù)端,則發(fā)送一個(gè)心跳包給客戶(hù)端,然后循環(huán)進(jìn)行下一次檢測(cè);另一個(gè)進(jìn)程定時(shí)檢測(cè)tcp連接上是否有數(shù)據(jù)的接收,如果一段時(shí)間內(nèi)沒(méi)有收到任何數(shù)據(jù),則判定為心跳超時(shí),最終會(huì)關(guān)閉tcp連接。另外,rabbitmq的流量控制機(jī)制可能會(huì)暫停heartbeat檢測(cè),這里不展開(kāi)描述。
涉及的源碼:
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> %%數(shù)據(jù)發(fā)送檢測(cè)進(jìn)程 {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, start_heartbeat_sender), %%數(shù)據(jù)接收檢測(cè)進(jìn)程 {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, start_heartbeat_receiver), {Sender, Receiver}. start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for %% nearly 2 * TimeoutSec before sending a heartbeat in the %% boundary case heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> SendFun(), continue end}). start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out %% between 2 and 3 intervals after the last data has been %% received heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, receive ... %% 定時(shí)檢測(cè) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> %% 收發(fā)數(shù)據(jù)有變化 if NewStatVal =/= StatVal -> %%重新開(kāi)始檢測(cè) Recurse({NewStatVal, 0}); %%未達(dá)到指定次數(shù), 發(fā)送為0, 接收為1 SameCount < Threshold -> %%計(jì)數(shù)加1, 再次檢測(cè) Recurse({NewStatVal, SameCount + 1}); %%heartbeat超時(shí) true -> %%對(duì)于發(fā)送檢測(cè)超時(shí), 向客戶(hù)端發(fā)送heartbeat包 %%對(duì)于接收檢測(cè)超時(shí), 向父進(jìn)程發(fā)送超時(shí)通知 %%由父進(jìn)程觸發(fā)tcp關(guān)閉等操作 case Handler() of %%接收檢測(cè)超時(shí) stop -> ok; %%發(fā)送檢測(cè)超時(shí) continue -> Recurse({NewStatVal, 0}) end; ...
收發(fā)檢測(cè)的時(shí)候利用了inet模塊的getstat,查看socket的統(tǒng)計(jì)信息
recv_oct: 查看socket上接收的字節(jié)數(shù)
send_oct: 查看socket上發(fā)送的字節(jié)數(shù)
上述內(nèi)容就是在Rabbitmq中利用heartbea實(shí)現(xiàn)心跳檢測(cè)機(jī)制的原理是什么,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。