溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用MQTT構建高性能物聯(lián)網(wǎng)消息處理后臺

發(fā)布時間:2021-12-07 09:28:48 來源:億速云 閱讀:192 作者:iii 欄目:互聯(lián)網(wǎng)科技

本篇內(nèi)容介紹了“怎么使用MQTT構建高性能物聯(lián)網(wǎng)消息處理后臺”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

背景

在各類物聯(lián)網(wǎng)項目中,設備產(chǎn)生的消息不僅僅作用于設備之間,還需要供業(yè)務系統(tǒng)使用以實現(xiàn)如安全審計、流量計費、數(shù)據(jù)統(tǒng)計、通知觸發(fā)等功能,類似很容易通過原型系統(tǒng)完成

該原型中需要在 EMQ X 上維護多個數(shù)據(jù)通道,以供每個業(yè)務環(huán)節(jié)按照各自需求從 EMQ X 中獲取消息數(shù)據(jù)。這種解決方案的問題在于:

  • 每個業(yè)務需要與 EMQ X 建立數(shù)據(jù)通道,數(shù)據(jù)通道的建立與保持需要額外的資源開銷,數(shù)據(jù)同步速度嚴重影響 EMQ X 高速消息交換;

  • 隨著業(yè)務增長,每次新增業(yè)務環(huán)節(jié)都需要牽動整個系統(tǒng)變更;

  • 由于每個環(huán)節(jié)處理速度與時序不一樣,消息量較大時部分業(yè)務會出現(xiàn)阻塞情況,進一步產(chǎn)生數(shù)據(jù)丟失、系統(tǒng)穩(wěn)定性降低等嚴重后果。

以上問題與當下互聯(lián)網(wǎng)應用中遇到的問題高度一致,即多個業(yè)務系統(tǒng)之間的數(shù)據(jù)集成與數(shù)據(jù)同步問題?;ヂ?lián)網(wǎng)應用中普遍集成消息隊列以進行削峰、限流、隊列處理等操作,實現(xiàn)數(shù)據(jù)與業(yè)務的解耦,借助 EMQ X 提供的 RabbitMQ、Kafka、RocketMQ、Pulsar 等消息與流中間件橋接功能,物聯(lián)網(wǎng)項目也可以使用該模型來解決以上問題。

本文以常見物聯(lián)網(wǎng)使用場景為例,介紹了如何利用 EMQ X 消息中間件與開源流處理平臺 Kafka 處理物聯(lián)網(wǎng)海量消息數(shù)據(jù),以高可靠、高容錯的方式存儲海量數(shù)據(jù)流并保證數(shù)據(jù)流的順序進行消息數(shù)據(jù)存儲,同時有效地將消息數(shù)據(jù)提供給多個業(yè)務環(huán)節(jié)使用。

業(yè)務場景

假設現(xiàn)在有一個智能門鎖項目,所有門鎖每間隔 1 分鐘或任何時間開/關鎖等門鎖狀態(tài)變更時上報一次門鎖信息,上報 MQTT 主題如下(QoS = 1):

devices/{client_id}/state

每個設備發(fā)送的數(shù)據(jù)格式為 JSON,包括門鎖電量、開鎖狀態(tài)、操作結果等數(shù)據(jù),內(nèi)容如下:

{
  "process_id": "7802441525528958",
  "action": "unlock",
  "battery": 83.4,
  "lock_state": 1,
  "version": 1.1,
  "client_id": "10083618796833171"
}

每個門鎖均訂閱一個唯一的主題,作為遠程下發(fā)開鎖指令,下發(fā) MQTT 主題如下(QoS = 1):

devices/{client_id}/command

下發(fā)的數(shù)據(jù)包括開鎖指令、消息加密驗證信息等:

{
  "process_id": "7802441525528958",
  "action": "unlock",
  "nonce_str": "u7u4p0n8",
  "ts": 1574744434,
  "sign": "e9f5af7deaa28563"
}

上行、下行消息數(shù)據(jù)需要供以下三個業(yè)務環(huán)節(jié)使用:

  • 消息通知:將開鎖狀態(tài)通知到門鎖用戶綁定的通知方式(手機短信、郵件);

  • 狀態(tài)監(jiān)控:分析處理門鎖定時上報的狀態(tài)信息,如果電量、狀態(tài)異常等需觸發(fā)告警通知用戶;

  • 安全審計:分析上下行消息數(shù)據(jù),記錄用戶開鎖行為,同時防范下行指令被篡改、重放等方式攻擊。

該方案中,EMQ X 會將以上主題的消息統(tǒng)一橋接到 Kafka 供業(yè)務系統(tǒng)使用,實現(xiàn)業(yè)務系統(tǒng)與 EMQ X 解耦。

client_id 為門鎖 ID,同門鎖連接至 EMQ X 使用的 MQTT Client ID。

方案介紹

Kafka 是由 Apache 軟件基金會開發(fā)的一個開源流處理平臺,由 Scala 和 Java 編寫。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高吞吐、低延遲的平臺。

kafka 有以下特性:

  • 高吞吐量:吞吐量高達數(shù)十萬高并發(fā),支持數(shù)千個客戶端同時讀寫;

  • 低延遲:延遲最低只有幾毫秒,輕松構建實時流應用程序;

  • 數(shù)據(jù)可靠性:將消息數(shù)據(jù)安全地分布式存儲,復制到容錯集群中,嚴格按照隊列順序處理,提供消息事務支持,保證數(shù)據(jù)完整性和消費可靠性;

  • 集群容錯性:多節(jié)點副本中,允許 n-1 個節(jié)點失敗

  • 可擴展性:支持集群動態(tài)擴展。

該方案中集成 Kafka 為 EMQ X 消息服務器與應用程序之間的消息傳遞提供消息隊列與消息總線。生產(chǎn)者(EMQ X)往隊列末尾添加數(shù)據(jù),每個消費者(業(yè)務環(huán)節(jié))依次讀取數(shù)據(jù)然后自行處理,這種架構兼顧了性能與數(shù)據(jù)可靠性,并有效降低系統(tǒng)復雜度、提升系統(tǒng)擴展性。該方案原型如下:

怎么使用MQTT構建高性能物聯(lián)網(wǎng)消息處理后臺

EMQ X Enterprise 安裝

安裝

如果您是 EMQ X 新手用戶,推薦通過 EMQ X 指南 快速上手

訪問 EMQ 官網(wǎng) 下載適合您操作系統(tǒng)的安裝包,由于數(shù)據(jù)持久化是企業(yè)功能,您需要下載 EMQ X 企業(yè)版(可以申請 License 試用) 寫本文的時候 EMQ X 企業(yè)版最新版本為 v3.4.4,下載 zip 包的啟動步驟如下 :

## 解壓下載好的安裝包
unzip emqx-ee-macosx-v3.4.4.zip
cd emqx

## 將 License 文件復制到 EMQ X 指定目錄 etc/, License 需自行申請試用或通過購買授權獲取
cp ../emqx.lic ./etc

## 以 console 模式啟動 EMQ X
./bin/emqx console

修改配置

本文中需要用到的配置文件如下:

  1. License 文件,EMQ X 企業(yè)版 License 文件,使用可用的 License 覆蓋:

etc/emqx.lic
  1. EMQ X Kafka 消息存儲插件配置文件,用于配置 Kafka 連接信息、數(shù)據(jù)橋接主題:

etc/plugins/emqx_bridge_kafka.conf

根據(jù)部署實際情況填寫插件配置信息如下,其余配置項請熟讀配置文件做出調(diào)整或直接使用默認配置即可:

## 連接地址
bridge.kafka.servers = 127.0.0.1:9092

## 需要處理的 Hooks 由于我們使用 QoS 1 的進行消息傳送,可以使用 ack hooks 
## 注釋其他無關事件、消息 Hooks

## bridge.kafka.hook.client.connected.1     = {"topic":"client_connected"}
## bridge.kafka.hook.client.disconnected.1  = {"topic":"client_disconnected"}
## bridge.kafka.hook.session.subscribed.1   = {"filter":"#", "topic":"session_subscribed"}
## bridge.kafka.hook.session.unsubscribed.1 = {"filter":"#", "topic":"session_unsubscribed"}
## bridge.kafka.hook.message.deliver.1      = {"filter":"#", "topic":"message_deliver"}

## filter 為需要處理的 MQTT 主題, topoc 為寫入的 Kafka 主題
## 注冊多個 Hooks 實現(xiàn)上行、下行消息處理

## 上報指令選擇 publish hooks
bridge.kafka.hook.message.publish.1        = {"filter":"devices/+/state", "topic":"message_state"}

## 下發(fā)指令選擇 acked hooks,確保消息抵達才入庫
bridge.kafka.hook.message.acked.1       = {"filter":"devices/+/command", "topic":"message_command"}

Kafka 安裝與初始化

通過 Docker 進行安裝 Kafka,映射數(shù)據(jù) 9092 端口供連接使用,Kafka 依賴 Zookeeper,下面提供完整安裝命令:

## 安裝 Zookeeper
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper

## 安裝并配置 Kafka
docker run -d --name kafka --publish 9092:9092 
		--link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
		--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 
		--env KAFKA_ADVERTISED_PORT=9092 
		wurstmeister/kafka:latest

預先在 Kafka 創(chuàng)建需要使用的主題:

## 進入 Kafka Docker 容器
docker exec -it kafka bash

## 上行數(shù)據(jù)主題 message_state
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic message_state

## 下行數(shù)據(jù)主題 message_command
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic message_command

至此,可以重啟 EMQ X 并啟動插件以應用以上配置:

./bin/emqx stop

./bin/emqx start

## 或使用 console 模式可以看到更多信息
./bin/emqx console

## 啟動插件
./bin/emqx_ctl plugins load emqx_bridge_kafka

## 啟動成功后會有以下提示
Plugin load emqx_bridge_kafka loaded successfully.

模擬測試

使用 kafka-console-consumer 啟動消費

該方案中三個業(yè)務環(huán)節(jié)詳細實現(xiàn)本文不再贅述,本文僅需保證消息寫入 Kafka 即可,可以使用 Kafka 自帶的消費命令查看主題內(nèi)的數(shù)據(jù):

## 進入 Kafka Docker 容器
docker exec -it kafka bash

## 上行數(shù)據(jù)主題
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic message_state --from-beginning

## 開啟另外一個窗口查看下行數(shù)據(jù)主題
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic message_command --from-beginning

命令成功執(zhí)行后將阻塞等待消費該主題的數(shù)據(jù),我們繼續(xù)后續(xù)操作。

模擬測試數(shù)據(jù)收發(fā)

通過 EMQ X 管理控制臺中的 WebSocket 工具可以模擬智能門鎖上/下行業(yè)務數(shù)據(jù)。瀏覽器打開 http://127.0.0.1:1883 進入 EMQ X 管理控制臺,打開 Tool -> WebSocket 功能,輸入連接信息建立 MQTT 連接模擬門鎖設備。連接信息里 Client ID 根據(jù)業(yè)務指定,本文使用 10083618796833171 。

訂閱下行控制主題

根據(jù)業(yè)務需求,需訂閱門鎖專屬下行控制主題 devices/{client_id}/command,此處需訂閱 devices/10083618796833171/command 主題并設置 QoS = 1

怎么使用MQTT構建高性能物聯(lián)網(wǎng)消息處理后臺

模擬下發(fā)指令

向門鎖控制主題 devices/{client_id}/command 發(fā)送開鎖指令,此處下發(fā)數(shù)據(jù)為:

  • 主題:devices/10083618796833171/command

  • QoS:1

  • payload:

    {
      "process_id": "7802441525528958",
      "action": "unlock",
      "nonce_str": "u7u4p0n8",
      "ts": 1574744434,
      "sign": "e9f5af7deaa28563"
    }


下發(fā)成功后管理控制臺 Publish 界面可以收到一條消息:

怎么使用MQTT構建高性能物聯(lián)網(wǎng)消息處理后臺

同時 Kafka message_command 主題消費者將收到一條或多條消息(EMQ X ack hooks 觸發(fā)次數(shù)以實際收到消息客戶端數(shù)量為準),消息為 JSON 格式,內(nèi)容經(jīng)格式化后如下:

{
  "client_id": "10083618796833171",
  "username": "",
  "from": "10083618796833171",
  "topic": "devices/10083618796833171/command",
  "payload": "eyAgICJwcm9jZXNzX2lkIjogIjc4MDI0NDE1MjU1Mjg5NTgiLCAgICJhY3Rpb24iOiAidW5sb2NrIiwgICAibm9uY2Vfc3RyIjogInU3dTRwMG44IiwgICAidHMiOiAxNTc0NzQ0NDM0LCAgICJzaWduIjogImU5ZjVhZjdkZWFhMjg1NjMiIH0=",
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "ts": 1574751635845
}

該條消息包含了 MQTT 接收/發(fā)布客戶端信息與 Base64 編碼后的 Payload 數(shù)據(jù):

  • client_id: 接收客戶端 client_id

  • username: 接受客戶端 username

  • from: 發(fā)布客戶端 client_id

  • topic: 消息發(fā)布目標主題

  • payload: 經(jīng) Base64 編碼后的消息 Payload

  • qos: 消息 QoS

  • node: 消息處理節(jié)點

  • ts: hooks 毫秒級觸發(fā)時間戳

模擬上報狀態(tài)

向門鎖控制主題 devices/{client_id}/state 發(fā)送狀態(tài)數(shù)據(jù),此處發(fā)布數(shù)據(jù)為:

  • 主題:devices/10083618796833171/state

  • QoS:1

  • payload:

    {
      "process_id": "7802441525528958",
      "action": "unlock",
      "battery": 83.4,
      "lock_state": 1,
      "version": 1.1,
      "client_id": "10083618796833171"
    }


上報成功后 Kafka message_state 消費者將收到一條消息(EMQ X publish hooks 觸發(fā)次數(shù)與發(fā)布消息有關,與消息主題是否被訂閱以及訂閱數(shù)量無關),消息為 JSON 格式,內(nèi)容經(jīng)格式化后如下:

{
  "client_id": "10083618796833171",
  "username": "",
  "topic": "devices/10083618796833171/state",
  "payload": "eyAgICJwcm9jZXNzX2lkIjogIjc4MDI0NDE1MjU1Mjg5NTgiLCAgICJhY3Rpb24iOiAidW5sb2NrIiwgICAiYmF0dGVyeSI6IDgzLjQsICAgImxvY2tfc3RhdGUiOiAxLCAgICJ2ZXJzaW9uIjogMS4xLCAgICJjbGllbnRfaWQiOiAiMTAwODM2MTg3OTY4MzMxNzEiIH0=",
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "ts": 1574753026269
}

該條消息僅包含 MQTT 發(fā)布客戶端信息與 Base64 編碼后的 Payload 數(shù)據(jù):

  • client_id: 發(fā)布客戶端 client_id

  • username:發(fā)布客戶端 username

  • topic: 消息發(fā)布目標主題

  • payload: 經(jīng) Base64 編碼后的消息 Payload

  • qos: 消息 QoS

  • node: 消息處理節(jié)點

  • ts: hooks 毫秒級觸發(fā)時間戳

至此,我們成功完成 EMQ X 橋接消息至 Kafka 所有步驟,業(yè)務系統(tǒng)接入 Kafka 后可以根據(jù)消費到的消息數(shù)量、消息發(fā)布者/訂閱者的 client_id 以及消息 payload 內(nèi)容進行業(yè)務判斷,實現(xiàn)所需業(yè)務功能。

性能測試

如果讀者對該方案的性能感興趣,可以采用 MQTT-JMeter 插件對其進行測試。需要注意的是,讀者需要在性能測試過程中保證做好 EMQ 集群、Kafka 集群、Kafka 的消費者,以及 JMeter 測試集群相關的優(yōu)化與配置,才可以得到相關配置下正確的最佳性能測試結果。

“怎么使用MQTT構建高性能物聯(lián)網(wǎng)消息處理后臺”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI