溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知

發(fā)布時間:2022-03-04 09:43:03 來源:億速云 閱讀:164 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

在第三方支付中,例如支付寶、或者微信,對于訂單請求,第三方支付系統(tǒng)采用的是消息同步返回、異步通知+主動補償查詢的補償機制。

由于互聯(lián)網(wǎng)通信的不可靠性,例如雙方網(wǎng)絡(luò)、服務(wù)器、應(yīng)用等因素的影響,不管是同步返回、異步通知、主動查詢報文都可能出現(xiàn)超時無響應(yīng)、報文丟失等情況,所以像支付業(yè)務(wù),對結(jié)果的通知一般采用幾種方案結(jié)合的補償機制,不能完全依賴某一種機制。
例如一個支付結(jié)果的通知,一方面會在支付頁面跳轉(zhuǎn)時候返回支付結(jié)果(一般只用作前端展示使用,非最終狀態(tài)),同時會采用后臺異步通知機制(有前臺、后臺通知的,以后臺異步通知結(jié)果為準(zhǔn)),但由于前臺跳轉(zhuǎn)、后臺結(jié)果通知都可能失效,因此還以定時補單+請求方主動查詢接口作為輔助手段。

常見的補單操作,任務(wù)調(diào)度策略一般設(shè)定30秒、60秒、3分鐘、6分鐘、10分鐘調(diào)度多次(以自己業(yè)務(wù)需要),如果調(diào)度接收到響應(yīng)確認(rèn)報文,補單成功,則中止對應(yīng)訂單的調(diào)度任務(wù);如果超過補單上限次數(shù),則停止補單,避免無謂的資源浪費。請求端隨時可以發(fā)起請求報文查詢對應(yīng)訂單的狀態(tài)。在日常開發(fā)中,對于網(wǎng)站前端來說,支付計費中心對于訂單請求信息的處理也是通過消息同步返回、異步通知+主動補償查詢相結(jié)合的機制,其中對于訂單的異步通知,目前的通知策略為3s、30s、60s、120s、180、300s的階梯性通知。返回成功情況下就不繼續(xù)通知了,本來打算使用將失敗的消息寫到數(shù)據(jù)庫等待發(fā)送,然后每秒查詢數(shù)據(jù)庫獲取消息通知前端。但覺得這樣的處理方式太粗暴。

存在以下缺點:

1 、每秒請求有點兒浪費資源;

2 、通知方式不穩(wěn)定;

3 、無法承受大數(shù)據(jù)量等等

所以最終打算使用rabbitmq的消息延遲+死信隊列來實現(xiàn)。消息模型如下:

RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知

producer發(fā)布消息,通過exchangeA的消息會被分發(fā)到QueueA,Consumer監(jiān)聽queueA,一旦有消息到來就被消費,這邊的消費業(yè)務(wù)就是通知前端,如果通知失敗,就創(chuàng)建一個延遲隊列declareQueue,設(shè)置每個消息的ttl然后通過declare_exchange將消息分發(fā)到declare_queue,因為declare_queue沒有consumer并且declare_queue中的消息設(shè)置了ttl,當(dāng)ttl到期后,將通過DEX路由到queueA,被重新消費。代碼如下:DeclareQueue.java

package org.delayQueue;
	
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
public class DeclareQueue {
	public static String EXCHANGE_NAME = "notifyExchange";
	public static void init() {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(5672);
		Connection connection = null;
		try {
			connection = factory.newConnection();
			Channel channel = connection.createChannel();
			channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
			String routingKey = "AliPaynotify";
			String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T?ify_id=4ab9bed148d043d0bf75460706f7774a?ify_time=2014-08-29+16%3A22%3A02?ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";
			channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
			System.out.println(" [x] Sent :" + message);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
	public static void main(String args[]) {
		init();
}

DeclareConsumer.java

package org.delayQueue;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DeclareConsumer {
	public static String EXCHANGE_NAME = "notifyExchange";
	public static String QU_declare_15S = "Qu_declare_15s";
	public static String EX_declare_15S = "EX_declare_15s";
	public static String ROUTINGKEY = "AliPaynotify";
	public static Connection connection = null;
	public static Channel channel = null;
	public static Channel DECLARE_15S_CHANNEL = null;
	public static String declare_queue = "init";
	public static String originalExpiration = "0";
	public static void init() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(5672);
		connection = factory.newConnection();
		channel = connection.createChannel();
		DECLARE_15S_CHANNEL = connection.createChannel();
	}
	public static void consume() {
		try {
			channel.exchangeDeclare(EXCHANGE_NAME, "topic");
			final String queueName = channel.queueDeclare().getQueue();
			channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);
			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			final Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
					String message = new String(body, "UTF-8");
					Map<String, Object> headers = properties.getHeaders();
					if (headers != null) {
						List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
						System.out.println("xDeath--- > " + xDeath);
						if (xDeath != null && !xDeath.isEmpty()) {
							Map<String, Object> entrys = xDeath.get(0);
							// for(Entry<String, Object>
							// entry:entrys.entrySet()){
							// System.out.println(entry.getKey()+":"+entry.getValue());
							// }
							originalExpiration = entrys.get("original-expiration").toString();
						}
					}
					System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());
					HttpClient httpClient = new DefaultHttpClient();
					HttpPost post = new HttpPost(message);
					HttpResponse response = httpClient.execute(post);
					BufferedReader inreader = null;
					if (response.getStatusLine().getStatusCode() == 200) {
						inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
						StringBuffer responseBody = new StringBuffer();
						String line = null;
						while ((line = inreader.readLine()) != null) {
							responseBody.append(line);
						if (!responseBody.equals("success")) {
							// putDeclre15s(message);
							if (originalExpiration.equals("0")) {
								putDeclreQueue(message, 3000, QU_declare_15S);
							}
							if (originalExpiration.equals("3000")) {
								putDeclreQueue(message, 30000, QU_declare_15S);
							if (originalExpiration.equals("30000")) {
								putDeclreQueue(message, 60000, QU_declare_15S);
							if (originalExpiration.equals("60000")) {
								putDeclreQueue(message, 120000, QU_declare_15S);
							if (originalExpiration.equals("120000")) {
								putDeclreQueue(message, 180000, QU_declare_15S);
							if (originalExpiration.equals("180000")) {
								putDeclreQueue(message, 300000, QU_declare_15S);
							if (originalExpiration.equals("300000")) {
//								channel.basicConsume(QU_declare_300S,true, this);
								System.out.println("finish notify");
					} else {
						System.out.println(response.getStatusLine().getStatusCode());
				}
			};
			channel.basicConsume(queueName, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
		}
	static Map<String, Object> xdeathMap = new HashMap<String, Object>();
	static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();
	static Map<String, Object> xdeathParam = new HashMap<String, Object>();
	public static void putDeclre15s(String message) throws IOException {
		channel.exchangeDeclare(EX_declare_15S, "topic");
		Map<String, Object> args = new HashMap<String, Object>();
		args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
		AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
		builder.expiration("3000").deliveryMode(2);// 設(shè)置消息TTL
		AMQP.BasicProperties properties = builder.build();
		channel.queueDeclare(QU_declare_15S, false, false, false, args);
		channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);
		channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
		System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());
	public static void putDeclreQueue(String message, int mis, String queue) throws IOException {
		builder.expiration(String.valueOf(mis)).deliveryMode(2);// 設(shè)置消息TTL
		channel.queueDeclare(queue, false, false, false, args);
		channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);
		System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());
	public static void main(String args[]) throws Exception {
		init();
		consume();
}

消息通過dlx轉(zhuǎn)發(fā)的情況下,header頭部會帶有x-death的一個數(shù)組,里面包含消息的各項屬性,比如說消息成為死信的原因reason,original-expiration這個字段表示消息在原來隊列中的過期時間,根據(jù)這個值來確定下一次通知的延遲時間應(yīng)該是多少秒。運行結(jié)果如下:

RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知

RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知

RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知

看完了這篇文章,相信你對“RabbitMQ延遲隊列如何實現(xiàn)訂單支付結(jié)果異步階梯性通知”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI