溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

基于MQTT怎么對接EMQ-X服務(wù)器

發(fā)布時(shí)間:2021-12-07 09:18:54 來源:億速云 閱讀:246 作者:iii 欄目:互聯(lián)網(wǎng)科技

本篇內(nèi)容介紹了“基于MQTT怎么對接EMQ-X服務(wù)器”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1. LiteOS MQTT組件

概述

MQTT AL用來解耦基于MQTT的業(yè)務(wù)和MQTT的具體實(shí)現(xiàn),具體來說以后的MQTT業(yè)務(wù)層應(yīng)該有且只能使用MQTT AL提供的相關(guān)功能(API 數(shù)據(jù)結(jié)構(gòu) 流程等)。MQTT AL定義MQTT的標(biāo)準(zhǔn),用來屏蔽各個(gè)MQTT協(xié)議實(shí)現(xiàn)的差異(如軟件庫 或者硬件),讓上層業(yè)務(wù)無需關(guān)心MQTT的實(shí)現(xiàn)部分。

MQTT AL的api接口聲明在<mqtt_al.h>中,使用相關(guān)的接口需要包含該頭文件,關(guān)于函數(shù)的詳細(xì)參數(shù)請參考該頭文件的聲明。

配置并連接

對接服務(wù)器的所有信息保存在結(jié)構(gòu)體mqtt_al_conpara_t中,其定義在mqtt_al.h中,如下:

/** @brief defines the paramter for the mqtt connect */
typedef struct
{
	mqtt_al_string_t               serveraddr;   ///< mqtt server:support domain name and dot format
	int                            serverport;   ///< mqtt server port
	mqtt_al_security_para_t       *security;     ///< if NULL,will use en_mqtt_security_none
	en_mqtt_al_verison             version;      ///< mqtt version will be used
	mqtt_al_string_t               clientid;     ///< mqtt connect client identifier
	mqtt_al_string_t               user;         ///< mqtt connect user
	mqtt_al_string_t               passwd;       ///< mqtt connect passwd
	int                            cleansession; ///< 1 clean the session while 0 not
	mqtt_al_willmsg_t             *willmsg;      ///< mqtt connect will message
	unsigned short                 keepalivetime;///< keep alive time
	char                           conret;       ///< mqtt connect code, return by server
	int                            timeout;      ///< how much time will be blocked
}mqtt_al_conpara_t;

其中的一些參數(shù)值已經(jīng)使用枚舉給出:

  • security:安全連接參數(shù)(使用此需要確保mbedtls組件開啟)

枚舉值如下:

/** @brief  this enum all the transport encode we support now*/
typedef enum
{
	en_mqtt_al_security_none = 0,    ///< no encode
	en_mqtt_al_security_psk,         ///< use the psk mode in transport layer
	en_mqtt_al_security_cas,	     ///< use the ca mode in transport layer,only check the server
	en_mqtt_al_security_cacs,	     ///< use the ca mode in transport layer,both check the server and client
	en_mqtt_al_security_end,         ///< the end for the mqtt
}en_mqtt_al_security_t;
  • version:使用的MQTT協(xié)議版本

枚舉值如下:

/** @brief enum the mqtt version*/
typedef enum
{
	en_mqtt_al_version_3_1_0 = 0,
	en_mqtt_al_version_3_1_1,
}en_mqtt_al_verison;

另外,在復(fù)制的時(shí)候還需要注意,很多字符串參數(shù)都是使用mqtt_al_string_t類型,其定義如下:

/** brief defines for all the ascii or data used in the mqtt engine */
typedef struct
{
	char *data;      ///< buffer to storage the data
	int   len;       ///< buffer data length
}mqtt_al_string_t;   //used to represent any type string (maybe not ascii)

在配置結(jié)構(gòu)體完成之后,調(diào)用配置函數(shù)進(jìn)行配置并連接,API如下:

/**
 *@brief: you could use this function to connect to the mqtt server
 *
 *@param[in] conparam  the parameter we will use in connect, refer to the data mqtt_al_conpara_t
 *@
 *@return: first you should check the return value then the return code in conparam
 *
 *@retval NULL which means you could not get the connect to the server,maybe network reason
 *@retval handle, which means you get the context, please check the conparam for more
 */
void * mqtt_al_connect( mqtt_al_conpara_t *conparam);

連接之后,首先應(yīng)該檢查返回的handle指針是否為空,其次應(yīng)該檢查mqtt_al_conpara_t結(jié)構(gòu)體中conret的值,有以下枚舉值:

/** @brief defines for the mqtt connect code returned by the server */
#define cn_mqtt_al_con_code_ok                0   ///< has been accepted by the server
#define cn_mqtt_al_con_code_err_version       1   ///< server not support the version
#define cn_mqtt_al_con_code_err_clientID      2   ///< client identifier is error
#define cn_mqtt_al_con_code_err_netrefuse     3   ///< server service not ready yet
#define cn_mqtt_al_con_code_err_u_p           4   ///< bad user name or password
#define cn_mqtt_al_con_code_err_auth          5   ///< the client is not authorized
#define cn_mqtt_al_con_code_err_unkown        -1  ///< unknown reason
#define cn_mqtt_al_con_code_err_network      0x80 ///< network reason,you could try once more

訂閱消息

EMQ-X服務(wù)器有心跳機(jī)制,實(shí)際應(yīng)用中訂閱之前應(yīng)該先檢查連接狀態(tài),本實(shí)驗(yàn)中暫不檢查。

連接成功后,首先訂閱消息,設(shè)置回調(diào)函數(shù),方便接收下發(fā)的命令。

訂閱消息的API如下:

/**
 * @brief you could use this function subscribe a topic from the server
 *
 * @param[in] handle the handle we get from mqtt_al_connect
 *
 * @param[in] subpara  refer to the data mqtt_al_subpara_t
 *
 * @return 0 success  -1  failed
 *
 */
int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);

兩個(gè)參數(shù)中,handle參數(shù)是之前使用mqtt_al_connect時(shí)返回的指針,直接傳入即可,subpara參數(shù)需要重點(diǎn)講述。

mqtt_al_subpara_t的定義如下:

/** @brief defines the mqtt subscribe parameter*/
typedef struct
{
	mqtt_al_string_t       topic;     ///< topic will be subscribe
	en_mqtt_al_qos_t       qos;       ///< qos requested
	fn_mqtt_al_msg_dealer  dealer;    ///< message dealer:used to deal the received message
	void                  *arg;       ///< used for the message dealer
	char                   subret;    ///< subscribe result code
	int                    timeout;   ///< how much time will be blocked
}mqtt_al_subpara_t;

其中訂閱消息質(zhì)量qos的枚舉值如下:

/** @brief enum all the qos supported for the application */
typedef enum
{
	en_mqtt_al_qos_0 = 0,     ///< mqtt QOS 0
	en_mqtt_al_qos_1,         ///< mqtt QOS 1
	en_mqtt_al_qos_2,         ///< mqtt QOS 2
	en_mqtt_al_qos_err
}en_mqtt_al_qos_t;

dealer是一個(gè)函數(shù)指針,接收到下發(fā)命令之后會被回調(diào),arg是回調(diào)函數(shù)參數(shù),其定義如下:

/** @brief  defines the mqtt received message dealer, called by mqtt engine*/
typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);

訂閱之后,可以通過mqtt_al_subpara_t結(jié)構(gòu)體中的subret值查看是否訂閱成功。

發(fā)布消息

發(fā)布消息的API如下:

/**
 * @brief you could use this function to publish a message to the server
 *
 * @param[in] handle the handle we get from mqtt_al_connect
 *
 * @param[in] msg  the message we will publish, see the data mqtt_al_pubpara_t
 *
 * @return 0 success  -1  failed
 *
 */
int	mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);

兩個(gè)參數(shù)中,handle參數(shù)是之前使用mqtt_al_connect時(shí)返回的指針,直接傳入即可,pubpara參數(shù)需要重點(diǎn)講述。

mqtt_al_pubpara_t的定義如下:

/** @brief defines for the mqtt publish */
typedef struct
{
	mqtt_al_string_t    topic;    ///< selected publish topic
	mqtt_al_string_t    msg;      ///< message to be published
	en_mqtt_al_qos_t    qos;      ///< message qos
	int                 retain;   ///< message retain :1 retain while 0 not
	int                 timeout;  ///< how much time will blocked
}mqtt_al_pubpara_t;

MQTT組件自動初始化

MQTT在配置之后,會自動初始化。

在SDK目錄中的IoT_LINK_1.0.0\iot_link\link_main.c文件中可以看到:

基于MQTT怎么對接EMQ-X服務(wù)器

2. 配置準(zhǔn)備

Makefile配置

因?yàn)楸敬螌?shí)驗(yàn)用到的組件較多:

  • AT框架

  • ESP8266設(shè)備驅(qū)動

  • 串口驅(qū)動框架

  • cJSON組件

  • SAL組件

  • MQTT組件

這些實(shí)驗(yàn)代碼全部編譯下來,有350KB,而小熊派開發(fā)板所使用的主控芯片STM32L431RCT6的 Flash 僅有256KB,會導(dǎo)致編譯器無法鏈接出可執(zhí)行文件,所以要在makefile中修改優(yōu)化選項(xiàng),修改為-Os參數(shù),即最大限度的優(yōu)化代碼尺寸,并去掉-g參數(shù),即代碼只能下載運(yùn)行,無法調(diào)試,如圖:

基于MQTT怎么對接EMQ-X服務(wù)器

ESP8266設(shè)備配置

在工程目錄中的OS_CONFIG/iot_link_config.h文件中,配置ESP8266設(shè)備的波特率和設(shè)備名稱:

基于MQTT怎么對接EMQ-X服務(wù)器

WIFI對接信息配置

SDK:C:\Users\Administrator\.icode\sdk\IoT_LINK_1.0.0(其中Administrator是實(shí)驗(yàn)電腦的用戶名)。

在SDK目錄中的iot_link\network\tcpip\esp8266_socket\esp8266_socket_imp.c文件中,配置連接信息:

基于MQTT怎么對接EMQ-X服務(wù)器

之后修改同路徑下的esp8266_socket_imp.mk文件,如圖,將 TOP_DIR 改為 SDK_DIR :

基于MQTT怎么對接EMQ-X服務(wù)器

修改paho_mqtt文件路徑

在SDK目錄中的iot_link\network\mqtt\paho_mqtt\paho_mqtt.mk文件中,如圖,將 TOP_DIR 改為 SDK_DIR :

基于MQTT怎么對接EMQ-X服務(wù)器

3. 使用mqtt.fx對接EMQ-X

配置

對接信息配置如下:

基于MQTT怎么對接EMQ-X服務(wù)器

其中ClientID隨機(jī)生成一個(gè)即可。

訂閱主題

使用mqtt.fx連接客戶端,訂閱本次實(shí)驗(yàn)中的兩個(gè)主題:

  • 主題led_cmd:用于發(fā)布控制命令

  • 主題lightness:用于上報(bào)亮度

基于MQTT怎么對接EMQ-X服務(wù)器

4. 上云實(shí)驗(yàn)

編寫實(shí)驗(yàn)文件

在 Demo 文件夾下創(chuàng)建cloud_test_demo文件夾,在其中創(chuàng)建emqx_mqtt_demo.c文件。

編寫代碼:

#include <osal.h>
#include <mqtt_al.h>
#include <string.h>


#define DEFAULT_LIFETIME            60
#define DEFAULT_SERVER_IPV4         "122.51.89.94"
#define DEFAULT_SERVER_PORT         1883
#define CN_MQTT_EP_CLIENTID         "emqx-test-001"
#define CN_MQTT_EP_USERNAME         "mculover666"
#define CN_MQTT_EP_PASSWD           "123456789"
#define CN_MQTT_EP_SUB_TOPIC1       "led_cmd"
#define CN_MQTT_EP_PUB_TOPIC1       "lightness"

#define recv_buf_len 100
static char recv_buffer[recv_buf_len];   //下發(fā)數(shù)據(jù)接收緩沖區(qū)
static int  recv_datalen;                //表示接收數(shù)據(jù)長度

osal_semp_t recv_sync;  //命令接收回調(diào)函數(shù)和處理函數(shù)之間的信號量

char lightness_buf[10];

static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg)
{
    if((msg->msg.len) < recv_buf_len)
    {
        //保存數(shù)據(jù)
        memcpy(recv_buffer,msg->msg.data,msg->msg.len );
        recv_buffer[msg->msg.len] = '\0';
        recv_datalen = msg->msg.len;
        printf("recv buf: %s.\r\n", recv_buffer);
        //釋放信號量,交由數(shù)據(jù)處理線程進(jìn)行處理
        osal_semp_post(recv_sync);
    }
    else
    {
        printf("recv buf is too small, len = %d.\r\n", msg->msg.len);
    }
}

static int task_recv_cmd_entry(void *args)
{
    while(1)
    {
        /* 阻塞等待信號量 */
        osal_semp_pend(recv_sync,cn_osal_timeout_forever);

        if(strstr(recv_buffer, "on"))
        {
                printf("-----------------LED ON !!! --------------------\r\n");
        }
        else if(strstr(recv_buffer, "off"))
        {
                printf("-----------------LED OFF !!! --------------------\r\n");
        }
    }
    return 0;
}

static int task_report_msg_entry(void *args)
{
    int ret = -1;
    void *handle = NULL;

    mqtt_al_conpara_t config;
    mqtt_al_string_t str_temp;
    mqtt_al_subpara_t subpara_led_cmd;

    mqtt_al_pubpara_t pubpara_lightness;

    int lightness_value = 0;


    /* 配置結(jié)構(gòu)體 */
    str_temp.data = DEFAULT_SERVER_IPV4;
    str_temp.len  = sizeof(DEFAULT_SERVER_IPV4);
    config.serveraddr = str_temp;
    config.serverport = DEFAULT_SERVER_PORT;
    config.security   = en_mqtt_al_security_none;
    config.version    = en_mqtt_al_version_3_1_0;
    str_temp.data = CN_MQTT_EP_CLIENTID;
    str_temp.len  = sizeof(CN_MQTT_EP_CLIENTID);
    config.clientid   = str_temp;
    str_temp.data = CN_MQTT_EP_USERNAME;
    str_temp.len  = sizeof(CN_MQTT_EP_USERNAME);
    config.user       = str_temp;
    str_temp.data = CN_MQTT_EP_PASSWD;
    str_temp.len  = sizeof(CN_MQTT_EP_PASSWD);
    config.passwd     = str_temp;
    config.cleansession = 1;
    config.willmsg    = NULL;
    config.keepalivetime = DEFAULT_LIFETIME;
    config.timeout    = 30;

    /* 配置并連接服務(wù)器 */
    handle = mqtt_al_connect(&config);
    if(handle == NULL)
    {
        /* 連接出錯 */
        printf("config error.\r\n");
        return -1;
    }
    else
    {
        /* 進(jìn)一步檢查服務(wù)器返回值 */
        if(config.conret != cn_mqtt_al_con_code_ok)
        {
            /* 服務(wù)器返回值出錯 */
            printf("server return error, conret = %d.\r\n", config.conret);
            return -1;
        }
        else
        {
            printf("connect to server success.\r\n");
        }
    }


    /* 連接成功后,訂閱led_cmd主題消息 */
    str_temp.data = CN_MQTT_EP_SUB_TOPIC1;
    str_temp.len  = sizeof(CN_MQTT_EP_SUB_TOPIC1);
    subpara_led_cmd.topic = str_temp;
    subpara_led_cmd.qos = en_mqtt_al_qos_0;
    subpara_led_cmd.dealer = mqtt_al_msg_dealer;
    subpara_led_cmd.arg = NULL;
    subpara_led_cmd.timeout = 60;
    ret =  mqtt_al_subscribe(handle, &subpara_led_cmd);
    if(ret < 0)
    {
        printf("sub topic %s fail.\r\n", subpara_led_cmd.topic.data);
        return -1;
    }
    else
    {
        /* 進(jìn)一步判斷是否訂閱成功 */
        if(cn_mqtt_al_con_code_ok != subpara_led_cmd.subret)
        {
            printf("sub topic %s fail, subret = %d.\r\n", subpara_led_cmd.topic.data, subpara_led_cmd.subret);
            return -1;
        }
        else
        {
            printf("sub topic %s success.\r\n", subpara_led_cmd.topic.data);
        }
    }
    
    /* 每隔10s上報(bào)一次數(shù)據(jù) */
    str_temp.data = CN_MQTT_EP_PUB_TOPIC1;
    str_temp.len  = sizeof(CN_MQTT_EP_PUB_TOPIC1);
    pubpara_lightness.topic = str_temp;
    pubpara_lightness.qos = en_mqtt_al_qos_0;
    pubpara_lightness.retain = 0;
    pubpara_lightness.timeout = 30;
    while(1)
    {
        sprintf(lightness_buf, "%d", lightness_value);
        str_temp.data = lightness_buf;
        str_temp.len  = strlen(lightness_buf);
        pubpara_lightness.msg = str_temp;

        ret = mqtt_al_publish(handle, &pubpara_lightness);
        if(ret < 0)
        {
            printf("publish topic %s fail.\r\n", pubpara_lightness.topic.data);
            return -1;
        }
        else
        {
            printf("publish topic %s success. payload = %s, lightness = %d.\r\n", pubpara_lightness.topic.data, pubpara_lightness.msg.data, lightness_value);
        }
        lightness_value++;
        osal_task_sleep(10*1000);
    }
}


int standard_app_demo_main()
{ 
    /* 創(chuàng)建信號量 */
    osal_semp_create(&recv_sync,1,0);

    /* 創(chuàng)建任務(wù) */
    osal_task_create("task_reportmsg",task_report_msg_entry,NULL,0x800,NULL,8);
    osal_task_create("task_recv_cmd",task_recv_cmd_entry,NULL,0x400,NULL,8);

    return 0;
}

添加路徑

在user_demo.mk中添加如下:

	#example for emqx_mqtt_demo
	ifeq ($(CONFIG_USER_DEMO), "emqx_mqtt_demo")	
		user_demo_src  = ${wildcard $(TOP_DIR)/targets/STM32L431_BearPi/Demos/cloud_test_demo/emqx_mqtt_demo.c}
	endif

添加位置如下:

基于MQTT怎么對接EMQ-X服務(wù)器

配置.sdkconfig

基于MQTT怎么對接EMQ-X服務(wù)器

特別說明:實(shí)驗(yàn)時(shí)需要關(guān)閉shell組件,否則會因動態(tài)內(nèi)存分配失敗而無法連接。

數(shù)據(jù)上報(bào)實(shí)驗(yàn)結(jié)果

編譯下載之后,可以在串口助手中看到輸出信息:

基于MQTT怎么對接EMQ-X服務(wù)器

在訂閱了該主題的客戶端也可以看到上報(bào)數(shù)據(jù):

基于MQTT怎么對接EMQ-X服務(wù)器

命令下發(fā)實(shí)驗(yàn)結(jié)果

在mqtt.fx中下發(fā)一條開啟命令:

基于MQTT怎么對接EMQ-X服務(wù)器

可以看到設(shè)備后作出回應(yīng):

基于MQTT怎么對接EMQ-X服務(wù)器

再下發(fā)一條關(guān)閉命令:

基于MQTT怎么對接EMQ-X服務(wù)器

可以看到設(shè)備后作出回應(yīng):

基于MQTT怎么對接EMQ-X服務(wù)器

“基于MQTT怎么對接EMQ-X服務(wù)器”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI