溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在 Rust 中使用 MQTT

發(fā)布時間:2021-07-27 09:21:07 來源:億速云 閱讀:212 作者:chen 欄目:互聯(lián)網(wǎng)科技

本篇內(nèi)容主要講解“怎么在 Rust 中使用 MQTT”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“怎么在 Rust 中使用 MQTT”吧!

Rust 是由 Mozilla 主導(dǎo)開發(fā)的通用、編譯型編程語言。該語言的設(shè)計準(zhǔn)則為:安全、并發(fā)、實(shí)用,支持 函數(shù)式、并發(fā)式、過程式以及面向?qū)ο蟮木幊田L(fēng)格。Rust 速度驚人且內(nèi)存利用率極高。由于沒有運(yùn)行時和垃圾回收,它能夠勝任對性能要求特別高的服務(wù),可以在嵌入式設(shè)備上運(yùn)行,還能輕松和其他語言集成。Rust 豐富的類型系統(tǒng)和所有權(quán)模型保證了內(nèi)存安全和線程安全,讓您在編譯期就能夠消除各種各樣的錯誤。

MQTT 是一種基于發(fā)布/訂閱模式的 輕量級物聯(lián)網(wǎng)消息傳輸協(xié)議 ,可以用極少的代碼和帶寬為聯(lián)網(wǎng)設(shè)備提供實(shí)時可靠的消息服務(wù),它廣泛應(yīng)用于物聯(lián)網(wǎng)、移動互聯(lián)網(wǎng)、智能硬件、車聯(lián)網(wǎng)、電力能源等行業(yè)。

本文主要介紹如何在 Rust 項目中使用 paho-mqtt 客戶端庫 ,實(shí)現(xiàn)客戶端與 MQTT 服務(wù)器的連接、訂閱、取消訂閱、收發(fā)消息等功能。

項目初始化

本項目使用 Rust 1.44.0 進(jìn)行開發(fā)測試,并使用 Cargo 1.44.0 包管理工具進(jìn)行項目管理,讀者可用如下命令查看當(dāng)前的 Rust 版本。

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)

選擇 MQTT 客戶端庫

paho-mqtt 是目前 Rust 中,功能完善且使用較多的 MQTT 客戶端,最新的 0.7.1 版本支持 MQTT v5、3.1.1、3.1,支持通過標(biāo)準(zhǔn) TCP、SSL / TLS、WebSockets 傳輸數(shù)據(jù),QoS 支持 0、1、2 等。

初始化項目

執(zhí)行以下命令創(chuàng)建名為 mqtt-example 的 Rust 新項目。

~ cargo new mqtt-example
    Created binary (application) `mqtt-example` package

編輯項目中的 Cargo.toml 文件,在 dependencies 中添加 paho-mqtt 庫的地址,以及指定訂閱、發(fā)布代碼文件對應(yīng)的二進(jìn)制文件。

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"

Rust MQTT 的使用

創(chuàng)建客戶端連接

本文將使用 EMQ X 提供的 免費(fèi)公共 MQTT 服務(wù)器 作為測試連接的 MQTT 服務(wù)器,該服務(wù)基于 EMQ X 的 MQTT 物聯(lián)網(wǎng)云平臺 創(chuàng)建。服務(wù)器接入信息如下:

  • Broker: broker.emqx.io

  • TCP Port: 1883

  • Websocket Port: 8083

配置 MQTT Broker 連接參數(shù)

配置 MQTT Broker 連接地址(包括端口)、topic (這里我們配置了兩個 topic ),以及客戶端 id。

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];

編寫 MQTT 連接代碼

編寫 MQTT 連接代碼,為了提升使用體驗(yàn),可在執(zhí)行二進(jìn)制文件時通過命令行參數(shù)的形式傳入連接地址。通常我們需要先創(chuàng)建一個客戶端,然后將該客戶端連接到 broker.emqx.io。

let host = env::args().nth(1).unwrap_or_else(||
    DFLT_BROKER.to_string()
);

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
    .server_uri(host)
    .client_id(DFLT_CLIENT.to_string())
    .finalize();

// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
    println!("Error creating the client: {:?}", err);
    process::exit(1);
});

// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_session(true)
    .finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
    println!("Unable to connect:\n\t{:?}", e);
    process::exit(1);
}

發(fā)布消息

這里我們總共發(fā)布五條消息,根據(jù)循環(huán)的奇偶性,分別向 rust/mqtt、 rust/test 這兩個主題發(fā)布。

for num in 0..5 {
    let content =  "Hello world! ".to_string() + &num.to_string();
    let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
    if num % 2 == 0 {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
        msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
    } else {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
    }
    let tok = cli.publish(msg);

			if let Err(e) = tok {
					println!("Error sending message: {:?}", e);
					break;
			}
}

訂閱消息

在客戶端連接之前,需要先初始化消費(fèi)者。這里我們會循環(huán)處理消費(fèi)者中的消息隊列,并打印出訂閱的 topic 名稱及接收到的消息內(nèi)容。

fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
  	...
    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();
  	...
    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }
  	...
}

完整代碼

消息發(fā)布代碼

use std::{
    env,
    process,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Define the set of options for the connection.
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Create a message and publish it.
    // Publish message to 'test' and 'hello' topics.
    for num in 0..5 {
        let content =  "Hello world! ".to_string() + &num.to_string();
        let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
        if num % 2 == 0 {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
            msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
        } else {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
        }
        let tok = cli.publish(msg);

				if let Err(e) = tok {
						println!("Error sending message: {:?}", e);
						break;
				}
    }


    // Disconnect from the broker.
    let tok = cli.disconnect(None);
    println!("Disconnect from the broker");
    tok.unwrap();
}

消息訂閱代碼

為了提升使用體驗(yàn),消息訂閱做了斷開重連的處理,并在重新建立連接后對主題進(jìn)行重新訂閱。

use std::{
    env,
    process,
    thread,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();

    // Define the set of options for the connection.
    let lwt = mqtt::MessageBuilder::new()
        .topic("test")
        .payload("Consumer lost connection")
        .finalize();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        .will_message(lwt)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }

    // If still connected, then disconnect now.
    if cli.is_connected() {
        println!("Disconnecting");
        cli.unsubscribe_many(DFLT_TOPICS).unwrap();
        cli.disconnect(None).unwrap();
    }
    println!("Exiting");
}

運(yùn)行與測試

編譯二進(jìn)制文件

執(zhí)行以下命令,會在 mqtt-example/target/debug 目錄下生成消息訂閱、發(fā)布對應(yīng)的 sub、pub 二進(jìn)制文件。

cargo build

怎么在 Rust 中使用 MQTT

執(zhí)行 sub 二進(jìn)制文件,等待消費(fèi)發(fā)布。

怎么在 Rust 中使用 MQTT

消息發(fā)布

執(zhí)行 pub 二進(jìn)制文件,可以看到分別往 rust/test 、rust/mqtt 這兩個主題發(fā)布了消息。

怎么在 Rust 中使用 MQTT 同時在消息訂閱中可看到發(fā)布的消息

怎么在 Rust 中使用 MQTT

至此,我們完成了使用 paho-mqtt 客戶端連接到 公共 MQTT 服務(wù)器,并實(shí)現(xiàn)了測試客戶端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱。

到此,相信大家對“怎么在 Rust 中使用 MQTT”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI