溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ如何實現(xiàn)RPC遠(yuǎn)程調(diào)用消息隊列

發(fā)布時間:2021-11-23 10:24:11 來源:億速云 閱讀:477 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了RabbitMQ如何實現(xiàn)RPC遠(yuǎn)程調(diào)用消息隊列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

  客戶端接口

  我們創(chuàng)建一個客戶端類來說明如何使用RPC服務(wù),暴露一個call方法來發(fā)送RPC請求和數(shù)據(jù)獲取結(jié)果。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();  
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

  盡管RPC是編程中一種常見的模式,但其也常常飽受批評。因為程序員常常不知道調(diào)用的方法是本地方法還是一個RPC方法,這在調(diào)試中常常增加一些不必要的復(fù)雜性。我們應(yīng)該簡化代碼,而不是濫用RPC導(dǎo)致代碼變的臃腫。

  回調(diào)隊列

  一般來說,通過RabbitMQ實現(xiàn)RPC非常簡單,客戶端發(fā)送一個請求消息,服務(wù)端響應(yīng)消息就完成了。為了接收到響應(yīng)內(nèi)容,我們在請求中發(fā)送”callback“隊列地址,也可以使用默認(rèn)的隊列。

callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties .Builder().replyTo(callbackQueueName) .build(); 
channel.basicPublish("", "rpc_queue", props, message.getBytes());

  AMQP協(xié)議中預(yù)定了14個消息屬性,除了下面幾個,其它的都很少使用:

  deliveryMode : 標(biāo)識消息是持久化還是瞬態(tài)的。

  contentType : 描述 mime-type的編碼類型,如JSON編碼為”application/json“。

  replyTo : 通常在回調(diào)隊列中使用。

  correlationId : 在請求中關(guān)聯(lián)RPC響應(yīng)時使用。

  關(guān)聯(lián)Id(Correlation Id)

  在前面的方法中,要求在每個RPC請求創(chuàng)建回調(diào)隊列,這可真是一件繁瑣的事情,但幸運的是我們有個好方法-在每個客戶端創(chuàng)建一個簡單的回調(diào)隊列。

  這樣問題又來了,隊列如何知道這些響應(yīng)來自哪個請求呢?這時候correlationId就出場了。我們在每個請求中都設(shè)置一個唯一的值,這樣我們在回調(diào)隊列中接收消息的時候就能知道是哪個請求發(fā)送的。如果收到未知的correlationId,就廢棄該消息,因為它不是我們發(fā)出的請求。

  你可能會問,為什么拋棄未知消息而不是拋出錯誤呢?這是由服務(wù)器競爭資源所導(dǎo)致的。盡管這不太可能,試想一下,如果RPC服務(wù)器在發(fā)送完響應(yīng)后而在發(fā)送應(yīng)答消息前死掉了,重啟RPC服務(wù)器會重新發(fā)送請求。這就是我們在客戶機上優(yōu)雅地處理重復(fù)的反應(yīng),RPC應(yīng)該是等同的。

RabbitMQ如何實現(xiàn)RPC遠(yuǎn)程調(diào)用消息隊列

 ?。?)客戶端啟動,創(chuàng)建一個匿名且唯一的回調(diào)隊列。

 ?。?)對每個RPC請求,客戶端發(fā)送一個包含replyTo和correlationId兩個屬性的消息。

 ?。?)請求發(fā)送到rpc_queue隊列。

 ?。?)RPC服務(wù)在隊列中等待請求,當(dāng)請求出現(xiàn)時,根據(jù)replyTo字段使用隊列將結(jié)果發(fā)送到客戶端。

 ?。?)客戶端在回調(diào)隊列中等待數(shù)據(jù)。當(dāng)消息出現(xiàn)時,它會檢查correlationId屬性,如果該值匹配的話,就會返回響應(yīng)結(jié)果給應(yīng)用。

  示例代碼

  RPCServer.java

package com.favccxx.favrabbit;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

	private static final String RPC_QUEUE_NAME = "rpc_queue";

	private static int fib(int n) {
		if (n == 0)
			return 0;
		if (n == 1)
			return 1;
		return fib(n - 1) + fib(n - 2);
	}

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

			channel.basicQos(1);

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

			System.out.println(" [x] Awaiting RPC requests");

			while (true) {
				String response = null;

				QueueingConsumer.Delivery delivery = consumer.nextDelivery();

				BasicProperties props = delivery.getProperties();
				BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId())
						.build();

				try {
					String message = new String(delivery.getBody(), "UTF-8");
					int n = Integer.parseInt(message);

					System.out.println(" [.] fib(" + message + ")");
					response = "" + fib(n);
				} catch (Exception e) {
					System.out.println(" [.] " + e.toString());
					response = "";
				} finally {
					channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));

					channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

  RPCClient.java       

package com.favccxx.favrabbit;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

public class RPCClient {

	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	public RPCClient() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		connection = factory.newConnection();
		channel = connection.createChannel();

		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}

	public String call(String message) throws Exception {
		String response = null;
		String corrId = UUID.randomUUID().toString();

		BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

		channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody(), "UTF-8");
				break;
			}
		}

		return response;
	}

	public void close() throws Exception {
		connection.close();
	}

	public static void main(String[] argv) {
		RPCClient fibonacciRpc = null;
		String response = null;
		try {
			fibonacciRpc = new RPCClient();

			System.out.println(" [x] Requesting fib(30)");
			response = fibonacciRpc.call("30");
			System.out.println(" [.] Got '" + response + "'");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (fibonacciRpc != null) {
				try {
					fibonacciRpc.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

  先啟動RPCServer,然后運行RPCClient,控制臺輸出如下內(nèi)容

RPCClient[x] Requesting fib(30)

RPCClient[.] Got '832040'


RPCServer[x] Awaiting RPC requests

RPCServer[.] fib(30)



感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“RabbitMQ如何實現(xiàn)RPC遠(yuǎn)程調(diào)用消息隊列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI