您好,登錄后才能下訂單哦!
這篇文章主要介紹了RabbitMQ如何實現(xiàn)RPC遠(yuǎn)程調(diào)用消息隊列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
客戶端接口
我們創(chuàng)建一個客戶端類來說明如何使用RPC服務(wù),暴露一個call方法來發(fā)送RPC請求和數(shù)據(jù)獲取結(jié)果。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
盡管RPC是編程中一種常見的模式,但其也常常飽受批評。因為程序員常常不知道調(diào)用的方法是本地方法還是一個RPC方法,這在調(diào)試中常常增加一些不必要的復(fù)雜性。我們應(yīng)該簡化代碼,而不是濫用RPC導(dǎo)致代碼變的臃腫。
回調(diào)隊列
一般來說,通過RabbitMQ實現(xiàn)RPC非常簡單,客戶端發(fā)送一個請求消息,服務(wù)端響應(yīng)消息就完成了。為了接收到響應(yīng)內(nèi)容,我們在請求中發(fā)送”callback“隊列地址,也可以使用默認(rèn)的隊列。
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder().replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes());
AMQP協(xié)議中預(yù)定了14個消息屬性,除了下面幾個,其它的都很少使用:
deliveryMode : 標(biāo)識消息是持久化還是瞬態(tài)的。
contentType : 描述 mime-type的編碼類型,如JSON編碼為”application/json“。
replyTo : 通常在回調(diào)隊列中使用。
correlationId : 在請求中關(guān)聯(lián)RPC響應(yīng)時使用。
關(guān)聯(lián)Id(Correlation Id)
在前面的方法中,要求在每個RPC請求創(chuàng)建回調(diào)隊列,這可真是一件繁瑣的事情,但幸運的是我們有個好方法-在每個客戶端創(chuàng)建一個簡單的回調(diào)隊列。
這樣問題又來了,隊列如何知道這些響應(yīng)來自哪個請求呢?這時候correlationId就出場了。我們在每個請求中都設(shè)置一個唯一的值,這樣我們在回調(diào)隊列中接收消息的時候就能知道是哪個請求發(fā)送的。如果收到未知的correlationId,就廢棄該消息,因為它不是我們發(fā)出的請求。
你可能會問,為什么拋棄未知消息而不是拋出錯誤呢?這是由服務(wù)器競爭資源所導(dǎo)致的。盡管這不太可能,試想一下,如果RPC服務(wù)器在發(fā)送完響應(yīng)后而在發(fā)送應(yīng)答消息前死掉了,重啟RPC服務(wù)器會重新發(fā)送請求。這就是我們在客戶機上優(yōu)雅地處理重復(fù)的反應(yīng),RPC應(yīng)該是等同的。
?。?)客戶端啟動,創(chuàng)建一個匿名且唯一的回調(diào)隊列。
?。?)對每個RPC請求,客戶端發(fā)送一個包含replyTo和correlationId兩個屬性的消息。
?。?)請求發(fā)送到rpc_queue隊列。
?。?)RPC服務(wù)在隊列中等待請求,當(dāng)請求出現(xiàn)時,根據(jù)replyTo字段使用隊列將結(jié)果發(fā)送到客戶端。
?。?)客戶端在回調(diào)隊列中等待數(shù)據(jù)。當(dāng)消息出現(xiàn)時,它會檢查correlationId屬性,如果該值匹配的話,就會返回響應(yīng)結(jié)果給應(yīng)用。
示例代碼
RPCServer.java
package com.favccxx.favrabbit; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }
RPCClient.java
package com.favccxx.favrabbit; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception ignore) { } } } } }
先啟動RPCServer,然后運行RPCClient,控制臺輸出如下內(nèi)容
RPCClient[x] Requesting fib(30) RPCClient[.] Got '832040' RPCServer[x] Awaiting RPC requests RPCServer[.] fib(30) |
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RabbitMQ如何實現(xiàn)RPC遠(yuǎn)程調(diào)用消息隊列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。