溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息

發(fā)布時間:2021-09-29 16:24:18 來源:億速云 閱讀:296 作者:柒染 欄目:編程語言

在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

消費組從stream中獲取到消息后,會分配給自己組中其中的一個消費者進行消費,消費者消費完畢,需要給消費組返回ACK,表示這條消息已經(jīng)消費完畢了。

當消費者從消費組獲取到消息的時候,會先把消息添加到自己的pending消息列表,當消費者給消費組返回ACK的時候,就會把這條消息從pending隊列刪除。(每個消費者都有自己的pending消息隊列)

消費者可能沒有及時的返回ACK。例如消費者消費完畢后,宕機,沒有及時返回ACK,此時就會導(dǎo)致這條消息占用2倍的內(nèi)存(stream中保存一份, 消費者的的pending消息列表中保存一份)

開始之前,通過Redis客戶端模擬一點數(shù)據(jù)

1,新打開Redis客戶端(我們稱之為:生產(chǎn)端), 創(chuàng)建streamm,名稱叫做:my_stream

XADD my_stream * hello world

隨便添加一條消息,目的是為了初始化stream

2,創(chuàng)建一個消費組,名稱叫做:my_group

XGROUP CREATE my_stream my_group $

3,再新啟動一個Redis客戶端(我們稱之為:消費端1),使用消費組進行阻塞消費,指定消費者:my_consumer1

XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream >

4,再新啟動一個Redis客戶端(我們稱之為:消費端2),使用消費組進行阻塞消費,指定消費者:my_consumer2

XREADGROUP GROUP my_group  my_consumer2  BLOCK 0 STREAMS my_stream >

5,通過生產(chǎn)端,推送3條消息

XADD my_stream * message1 Hello
XADD my_stream * message2 SpringBoot
XADD my_stream * message3 Community
生產(chǎn)端

在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息

消費端1

在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息

消費端2

在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息

可以看到,一共Push了3條消息,它們的ID分別是

  • 1605524648266-0 (message1 )

  • 1605524657157-0 (message2)

  • 1605524665215-0 (message3)

現(xiàn)在的狀況是,消費者1,消費了2條消息(message1和message3),消費者2,消費了1條消息(message2)。都是消費成功了的,但是它們都還沒有進行ACK

在客戶端,消費者消費到一條消息后會立即返回,需要重新執(zhí)行命令,來回到阻塞狀態(tài)

ACK消息

現(xiàn)在我們打算,把消費者1,消費的那條message1進行ACK

XACK my_stream my_group  1605524648266-0

獲取指定消費組中,待確認(ACK)的消息

查看消費組的所有待確認消息統(tǒng)計

127.0.0.1:6379> XPENDING my_stream  my_group
1) (integer) 2       # 消費組中,所有消費者的pending消息數(shù)量
2) "1605524657157-0" # pending消息中的,最小消息ID
3) "1605524665215-0" # pending消息中的,最大消息ID
4) 1) 1) "my_consumer1"  # 消費者1
      2) "1"             # 有1條待確認消息
   2) 1) "my_consumer2"  # 消費者2
      2) "1"             # 有2條待確認消息

查看消費者1的待確認消息詳情

127.0.0.1:6379> XPENDING my_stream  my_group 0 + 10 my_consumer1
1) 1) "1605524665215-0"  # 待ACK消息ID
   2) "my_consumer1"     # 所屬消費者
   3) (integer) 847437   # 消息自從被消費者獲取后到現(xiàn)在過去的時間(毫秒) - idle time
   4) (integer) 1        # 消息被獲取的次數(shù)	- delivery counter

這條命令,表示查詢消費組my_group 中消費者my_consumer1的opending隊列,開始ID是0,結(jié)束ID是最大,最多檢索10個結(jié)果。

現(xiàn)在的情況就是,一共3條消息,消費者1消費了2條,ack了1條。消費者2消費了1條,沒有ack。消費者1和2,各自的pending隊列中都有一條未ack的消息

如何實現(xiàn)將未被成功消費的消息獲取出來重新進行消費?之前的演示,目的都是為了造一些數(shù)據(jù),所以是用的客戶端命令,從這里開始,所有的演示,都會使用spring-data-redis中的RedisTemplate。

遍歷消費者的pending列表,讀取到未ACK的消息,直接進行ACK

import java.time.Duration;
import java.util.List;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	@Test
	public void test() {
		StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
		
		// 獲取my_group中的pending消息信息,本質(zhì)上就是執(zhí)行XPENDING指令
		PendingMessagesSummary pendingMessagesSummary = streamOperations.pending("my_stream", "my_group");
		
		// 所有pending消息的數(shù)量
		long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();
		
		// 消費組名稱
		String groupName= pendingMessagesSummary.getGroupName();
		
		// pending隊列中的最小ID
		String minMessageId = pendingMessagesSummary.minMessageId();
		
		// pending隊列中的最大ID
		String maxMessageId = pendingMessagesSummary.maxMessageId();
		
		LOGGER.info("消費組:{},一共有{}條pending消息,最大ID={},最小ID={}", groupName, totalPendingMessages, minMessageId, maxMessageId);
		

		// 每個消費者的pending消息數(shù)量
		Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
		
		pendingMessagesPerConsumer.entrySet().forEach(entry -> {
			
			// 消費者
			String consumer = entry.getKey();
			// 消費者的pending消息數(shù)量
			long consumerTotalPendingMessages = entry.getValue();
			
			LOGGER.info("消費者:{},一共有{}條pending消息", consumer, consumerTotalPendingMessages);
			
			if (consumerTotalPendingMessages > 0) {
				// 讀取消費者pending隊列的前10條記錄,從ID=0的記錄開始,一直到ID最大值
				PendingMessages pendingMessages = streamOperations.pending("my_stream", Consumer.from("my_group", consumer), Range.closed("0", "+"), 10);
				
				// 遍歷所有Opending消息的詳情
				pendingMessages.forEach(message -> {
					// 消息的ID
					RecordId recordId =  message.getId();
					// 消息從消費組中獲取,到此刻的時間
					Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();
					// 消息被獲取的次數(shù)
					long deliveryCount = message.getTotalDeliveryCount();
					
					LOGGER.info("openg消息,id={}, elapsedTimeSinceLastDelivery={}, deliveryCount={}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);
					
					/**
					 * 演示手動消費的這個判斷非常的針對,目的就是要讀取消費者“my_consumer1”pending消息中,ID=1605524665215-0的這條消息
					 */
					if (consumer.equals("my_consumer1") && recordId.toString().equals("1605524665215-0")) {
						// 通過streamOperations,直接讀取這條pending消息,
						List<MapRecord<String, String, String>> result = streamOperations.range("my_stream", Range.rightOpen("1605524665215-0", "1605524665215-0"));
						
						// 開始和結(jié)束都是同一個ID,所以結(jié)果只有一條
						MapRecord<String, String, String> record = result.get(0);
						
						// 這里執(zhí)行日志輸出,模擬的就是消費邏輯
						LOGGER.info("消費了pending消息:id={}, value={}", record.getId(), record.getValue());
						
						// 如果手動消費成功后,往消費組提交消息的ACK
						Long retVal = streamOperations.acknowledge("my_group", record);
						LOGGER.info("消息ack,一共ack了{}條", retVal);
					}
				});
			}
		});
	}
}

這種方式就是,遍歷消費組的pending消息情況,再遍歷每個消費者的pending消息id列表,再根據(jù)id,直接去stream讀取這條消息,進行消費Ack。

輸出日志

消費組:my_group,一共有2條pending消息,最大ID=1605524657157-0,最小ID=1605524665215-0
消費者:my_consumer1,一共有1條pending消息
openg消息,id=1605524665215-0, elapsedTimeSinceLastDelivery=PT1H9M4.061S, deliveryCount=1
消費了pending消息:id=1605524665215-0, value={message3=Community}
消息ack,一共ack了1條
消費者:my_consumer2,一共有1條pending消息
openg消息,id=1605524657157-0, elapsedTimeSinceLastDelivery=PT1H9M12.172S, deliveryCount=1

最終的結(jié)果就是,消費者1的唯一一條pending消息被Ack了,這里有幾個點要注意

  1. 遍歷消費者pending列表時候,最小/大消息id,可以根據(jù)XPENDING指令中的結(jié)果來,我寫0 - +,只是為了偷懶

  2. 遍歷到消費者pending消息的時候,可以根據(jù)elapsedTimeSinceLastDelivery (idle time)和deliveryCount (delivery counter)做一些邏輯判斷,elapsedTimeSinceLastDelivery 越長,表示這條消息被消費了很久,都沒Ack,deliveryCount 表示重新投遞N次后(下文會講),都沒被消費成功,可能是消費邏輯有問題,或者是Ack有問題。

再次查看XPENDING信息

127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer2"
      2) "1"

消費者1,唯1條待ack的消息看,已經(jīng)被我們遍歷出來手動消費,手動ack了,所以只剩下消費者2還有1條pending消息。。

通過XCLAIM改變消息的消費者

如果一個消費者,一直不能消費掉某條消息,或者說這個消費者因為某些消息,永遠也不能上過線了,那么可以把這個消費者的pending消息,轉(zhuǎn)移到其他的消費者pending列表中,重新消費

其實我們這里要做的事情,就是把“消費者2”的唯一1條pending消息“ 1605524657157-0”(message2),交給“消費者1”,重新進行消費。

Redis命令的實現(xiàn)

XCLAIM my_stream  my_group my_consumer1 10000 1605524657157-0

1605524657157-0這條消息,重新給my_group 中的my_consumer1 進行消費,前提條件是這條消息的idle time大于了10秒鐘(從獲取消息到現(xiàn)在超過10秒都沒Ack)。

Java客戶端的實現(xiàn)

import java.time.Duration;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	@Test
	public void test() {
		List<ByteRecord> retVal = this.stringRedisTemplate.execute(new RedisCallback<List<ByteRecord>>() {
			@Override
			public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException {
				// XCLAIM 指令的實現(xiàn)方法
				return connection.streamCommands().xClaim("my_stream".getBytes(), "my_group", "my_consumer1", Duration.ofSeconds(10), RecordId.of("1605524657157-0"));
			}
		});
		
		for (ByteRecord byteRecord : retVal) {
			LOGGER.info("改了消息的消費者:id={}, value={}", byteRecord.getId(), byteRecord.getValue());
		}
	}
}
日志輸出
改了消息的消費者:id=1605524657157-0, value={[B@10b4f345=[B@63de4fa}
再次查看XPENDING信息
127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 1
2) "1605524657157-0"
3) "1605524657157-0"
4) 1) 1) "my_consumer1"
      2) "1"

可以看到,消息 “1605524657157-0”(message2),已經(jīng)從“消費者2”名下,轉(zhuǎn)移到了”消費者1”,接下來要做的事情,就是遍歷“消費者1”的pending列表,消費掉它。

讀取pending消息列表,進行消費

最開始在控制,演示了通過客戶端,進行消費者阻塞消費的時候,寫了一條命令

XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream >

其中最后那個>,表示ID,是一個特殊字符,如果不是,當ID不是特殊字符>時, XREADGROUP不再是從消息隊列中讀取消息, 而是從消費者的的pending消息列表中讀取歷史消息。(一般將參數(shù)設(shè)為0-0,表示讀取所有的pending消息)

Redis命令

127.0.0.1:6379> XREADGROUP GROUP my_group  my_consumer1  BLOCK 0 STREAMS my_stream 0
1) 1) "my_stream"
   2) 1) 1) "1605524657157-0"
         2) 1) "message2"
            2) "SpringBoot"

讀取到了,消費者1,pending消息中的唯一一條消息記錄

Java實現(xiàn)

import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import io.springboot.jwt.SpringBootJwtApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootJwtApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
public class RedisStreamTest {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamTest.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	@SuppressWarnings("unchecked")
	@Test
	public void test() {
		
		StreamOperations<String, String, String> streamOperations = this.stringRedisTemplate.opsForStream();
		
		// 從消費者的pending隊列中讀取消息
		List<MapRecord<String, String, String>>  retVal = streamOperations.read(Consumer.from("my_group", "my_consumer1"), StreamOffset.create("my_stream", ReadOffset.from("0")));
		
		// 遍歷消息
		for (MapRecord<String, String, String> record : retVal ) {
			// 消費消息
			LOGGER.info("消息id={}, 消息value={}", record.getId(), record.getValue());
			// 手動ack消息
			streamOperations.acknowledge("my_group", record);
		}
	}
}

這種方式,就是直接從消費者的pending隊列中讀取數(shù)據(jù),手動進行消費,然后Ack

日志
消息id=1605524657157-0, 消息value={message2=SpringBoot}
再次查看XPENDING信息
127.0.0.1:6379> XPENDING my_stream  my_group 
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

沒了,一條都沒,全部已經(jīng)Ack了。

死信

死信,就是一直沒法被消費的消息,可以根據(jù)這個兩個屬性idle timedelivery counter 進行判斷

idle time 當消息被消費者讀取后,就會開始計時,如果一個pending消息的idle time很長,表示這消息,可能是在Ack時發(fā)生了異常,或者說還沒來得及Ack,消費者就宕機了,導(dǎo)致一直沒有被Ack,當消息發(fā)生了轉(zhuǎn)移,它會清零,重新計時。

delivery counter,它表示轉(zhuǎn)移的次數(shù),每當一條消息的消費者發(fā)生變更的時候,它的值都會+1,如果一條pending消息的delivery counter值很大,表示它在多個消費者之間進行了多次轉(zhuǎn)移都沒法成功消費,可以人工的讀取,消費掉。


redis5的stream,可以說功能還是蠻強大(設(shè)計上狠狠借鑒了一把Kakfa)。如果應(yīng)用規(guī)模并不大,需要一個MQ服務(wù),我想Stream的你可以試試看,比起自己搭建kakfa,RocketMQ之類的,來的快當而且更好維護。

看完上述內(nèi)容,你們掌握在SpringBoot中如何使用RedisTemplate重新消費Redis Stream中未ACK的消息的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI