溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)

發(fā)布時間:2022-01-15 16:54:30 來源:億速云 閱讀:277 作者:iii 欄目:互聯(lián)網(wǎng)科技

今天小編給大家分享一下EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)的相關(guān)知識點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

EMQ X 數(shù)據(jù)持久化簡介

數(shù)據(jù)持久化的主要使用場景包括將客戶端上下線狀態(tài),訂閱主題信息,消息內(nèi)容,消息抵達(dá)后發(fā)送消息回執(zhí)等操作記錄到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra、AWS DynamoDB 等各類數(shù)據(jù)庫中供外部服務(wù)快速查詢或在服務(wù)宕機(jī)/客戶端異常離線時保留當(dāng)前運(yùn)行狀態(tài),連接恢復(fù)時恢復(fù)到之前狀態(tài);持久化亦可用于客戶端代理訂閱,設(shè)備客戶端上線時,持久化模塊直接從數(shù)據(jù)庫加載預(yù)設(shè)的主題并完成代理訂閱,降低系統(tǒng)設(shè)計復(fù)雜度和減少客戶端訂閱通信開銷。

用戶也可以通過訂閱相關(guān)主題的方式來實現(xiàn)類似的功能,但是在企業(yè)版中內(nèi)置的這些持久化的支持執(zhí)行效率更高、可靠性更強(qiáng),大大降低了開發(fā)者的工作量并提升了系統(tǒng)穩(wěn)定性。

數(shù)據(jù)持久化是 EMQ X 的重要功能,僅在企業(yè)版支持。

持久化設(shè)計

持久化原理是配置事件鉤子觸發(fā)時調(diào)用處理函數(shù)(action),處理函數(shù)獲取到相應(yīng)的數(shù)據(jù)后按照配置的指令進(jìn)行處理,實現(xiàn)數(shù)據(jù)的增、刪、改、查。相同事件鉤子在不同數(shù)據(jù)庫中可用參數(shù)是一樣的,但處理函數(shù)(action)因數(shù)據(jù)庫特性不同有所差異。整個持久化工作模式和流程如下:

一對一消息存儲

EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)

  1. Publish 端發(fā)布一條消息;

  2. Backend 將消息記錄數(shù)據(jù)庫中;

  3. Subscribe 端訂閱主題;

  4. Backend 從數(shù)據(jù)庫中獲取該主題的消息;

  5. 發(fā)送消息給 Subscribe 端;

  6. Subscribe 端確認(rèn)后 Backend 從數(shù)據(jù)庫中移除該消息;

一對多消息存儲

EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)

  1. PUB 端發(fā)布一條消息;

  2. Backend 將消息記錄在數(shù)據(jù)庫中;

  3. SUB1 和 SUB2 訂閱主題;

  4. Backend 從數(shù)據(jù)庫中獲取該主題的消息;

  5. 發(fā)送消息給 SUB1 和 SUB2;

  6. Backend 記錄 SUB1 和 SUB2 已讀消息位置,下次獲取消息從該位置開始。

Redis 數(shù)據(jù)持久化

本文以實際例子來說明如何通過 Redis 來存儲相關(guān)的信息。

Redis 是完全開源免費(fèi)遵守 BSD 協(xié)議的高性能 key-value 數(shù)據(jù)庫。

相比其他 key-value 緩存產(chǎn)品 Redis 有以下特點(diǎn):

  • Redis 性能極高,單機(jī)支持十萬級別的讀寫速度。

  • Redis 支持?jǐn)?shù)據(jù)的持久化,可以將內(nèi)存中的數(shù)據(jù)保存在磁盤中,重啟的時候可以再次加載進(jìn)行使用。

  • Redis 不僅僅支持簡單的 key-value 類型的數(shù)據(jù),同時還提供 list,set,zset,hash 等數(shù)據(jù)結(jié)構(gòu)的存儲。

  • Redis 支持?jǐn)?shù)據(jù)的備份,即 master-slave 模式的數(shù)據(jù)備份。

讀者可以參考 Redis 官方的 Quick Start 來安裝 Redis(寫本文的時候,Redis 版本為5.0),通過 redis-server 命令來啟動 Redis 服務(wù)器。

配置 EMQ X 服務(wù)器

通過 RPM 方式安裝的 EMQ X,Redis 相關(guān)的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf,如果只是測試 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服務(wù)器的地址:如果讀者安裝的 Redis 不與 EMQ X 在同一服務(wù)器上,請指定正確的 Redis 服務(wù)器的地址與端口。如下所示:

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379

保持剩下部分的配置文件不變,然后啟動該插件:

emqx_ctl plugins load emqx_backend_redis

客戶端在線狀態(tài)存儲

客戶端上下線時,更新在線狀態(tài)、上下線時間、節(jié)點(diǎn)客戶端列表至 Redis 數(shù)據(jù)庫。

盡管 EMQ X 本身提供了設(shè)備在線狀態(tài) API,但在需要頻繁獲取客戶端在線狀態(tài)、上下線時間的場景下,直接從數(shù)據(jù)庫獲取該記錄比調(diào)用 EMQ X API 更高效。

配置項

打開配置文件,配置 Backend 規(guī)則:

## 上線
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下線
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

使用示例

瀏覽器打開 http://127.0.0.1:18083 EMQ X 管理控制臺,在 工具 -> Websocket 中新建一個客戶端連接,指定 clientid 為 sub_client:

EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)

打開 redis-cli 命令行窗口,執(zhí)行命令 keys *,結(jié)果如下所示,讀者可以看到在 Redis 中存儲了兩個 key:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"

連接列表

插件以 mqtt:node:{node_name} 格式的 key 記錄節(jié)點(diǎn)下客戶端列表及連接時間戳信息,等效操作:

## redis key 為 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836

字段說明:

## 節(jié)點(diǎn)下在線設(shè)備信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上線時間時間戳
3) "sub_client"
4) "1542272836"

連接詳細(xì)信息

插件以 mqtt:client:{client_id} 格式的 key 記錄客戶端在線狀態(tài)、上線時間,等效操作:

## redis key 為 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854

字段說明:

## 客戶端在線狀態(tài)
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 離線 1 在線
3) "online_at"
4) "1542272854" # 上線時間戳
5) "offline_at"
6) "undefined" # 離線時間戳

客戶端代理訂閱

客戶端上線時,存儲模塊直接從數(shù)據(jù)庫讀取預(yù)設(shè)待訂閱列表,代理加載訂閱主題。在客戶端需要通過預(yù)定主題通信(接收消息)場景下,應(yīng)用能從數(shù)據(jù)層面設(shè)定 / 改變代理訂閱列表。

配置項

打開配置文件,配置 Backend 規(guī)則:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

當(dāng) sub_client 設(shè)備上線時,需要為其訂閱 sub_client/upstreamsub_client/downlink 兩個 QoS 1 的主題:

  1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理訂閱 Hash:

## redis key 為 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
  1. EMQ X 管理控制臺 WebSocket 頁面,以 clientid sub_client 新建一個客戶端連接,切換至訂閱頁面,可見當(dāng)前客戶端自動訂閱了 sub_client/upstreamsub_client/downlink 兩個 QoS 1 的主題:

EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)

  1. 切換回管理控制臺 WebSocket 頁面,向 sub_client/downlink 主題發(fā)布消息,可在消息訂閱列表收到發(fā)布的消息。

持久化發(fā)布消息

配置項

打開配置文件,配置 Backend 規(guī)則,支持使用 topic 參數(shù)進(jìn)行消息過濾,此處使用 # 通配符存儲任意主題消息:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制臺 WebSocket 頁面中,使用 clientid sub_client 建立連接,向主題 upstream_topic 發(fā)布多條消息。針對每條消息, EMQ X 將持久化消息列表、消息詳情兩條記錄。

消息列表

EMQ X 將消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

## 獲取 upstream_topic 主題集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>

消息詳情

每條消息詳情將以 mqtt:msg:{message_id} 格式的 key 存儲在 Redis Hash 中:

## 獲取 message id 為 2VFt0kuNrOktefX6m4nP 的消息詳情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 時間戳
13) "retain"
14) "false"

獲取離線消息

配置項

打開配置文件,配置 Backend 規(guī)則:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## 一對一離線消息
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## 一對多離線消息
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}

使用示例

MQTT 離線消息需滿足以下條件:

  1. 以 clean_session = false 連接

  2. 訂閱 QoS > 0

  3. 發(fā)布 QoS > 0

在 EMQ X 管理控制臺中以如下配置建立連接,

EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)

持久化 Retain 消息

配置項

打開配置文件,配置 Backend 規(guī)則:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

消息列表

EMQ X 將消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

## 獲取 upstream_topic 主題集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>

消息詳情

每條消息詳情將以 mqtt:msg:{message_id} 格式的 key 存儲在 Redis Hash 中:

## 獲取 message id 為 2VFt0kuNrOktefX6m4nP 的消息詳情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 時間戳
13) "retain"
14) "false"

以上就是“EMQ X Redis數(shù)據(jù)持久化怎么實現(xiàn)”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學(xué)習(xí)更多的知識,請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI