溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

怎么使用ZeroMQ消息庫(kù)在C和Python間共享數(shù)據(jù)

發(fā)布時(shí)間:2021-10-29 10:16:31 來(lái)源:億速云 閱讀:144 作者:iii 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“怎么使用ZeroMQ消息庫(kù)在C和Python間共享數(shù)據(jù)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!



ZeroMQ 提供了一個(gè)更簡(jiǎn)單的過(guò)程:

  1. 編寫(xiě)一小段 C 代碼,從硬件讀取數(shù)據(jù),然后把發(fā)現(xiàn)的東西作為消息發(fā)送出去。

  2. 使用 Python 編寫(xiě)接口,實(shí)現(xiàn)新舊基礎(chǔ)設(shè)施之間的對(duì)接。

Pieter Hintjens 是 ZeroMQ 項(xiàng)目發(fā)起者之一,他是個(gè)擁有 有趣視角和作品 的非凡人物。

準(zhǔn)備

本教程中,需要:

  • 一個(gè) C 編譯器(例如 GCC 或 Clang)

  • libzmq 庫(kù)

  • Python 3

  • ZeroMQ 的 Python 封裝

Fedora 系統(tǒng)上的安裝方法:

$ dnf install clang zeromq zeromq-devel python3 python3-zmq

Debian 和 Ubuntu 系統(tǒng)上的安裝方法:

$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq

如果有問(wèn)題,參考對(duì)應(yīng)項(xiàng)目的安裝指南(上面附有鏈接)。

編寫(xiě)硬件接口庫(kù)

因?yàn)檫@里針對(duì)的是個(gè)設(shè)想的場(chǎng)景,本教程虛構(gòu)了包含兩個(gè)函數(shù)的操作庫(kù):

  • fancyhw_init() 用來(lái)初始化(設(shè)想的)硬件

  • fancyhw_read_val() 用于返回從硬件讀取的數(shù)據(jù)

將庫(kù)的完整代碼保存到文件 libfancyhw.h 中:

#ifndef LIBFANCYHW_H#define LIBFANCYHW_H #include <stdlib.h>#include <stdint.h> // This is the fictitious hardware interfacing library void fancyhw_init(unsigned int init_param){    srand(init_param);} int16_t fancyhw_read_val(void){    return (int16_t)rand();} #endif

這個(gè)庫(kù)可以模擬你要在不同語(yǔ)言實(shí)現(xiàn)的組件間交換的數(shù)據(jù),中間有個(gè)隨機(jī)數(shù)發(fā)生器。

設(shè)計(jì) C 接口

下面從包含管理數(shù)據(jù)傳輸?shù)膸?kù)開(kāi)始,逐步實(shí)現(xiàn) C 接口。

需要的庫(kù)

開(kāi)始先加載必要的庫(kù)(每個(gè)庫(kù)的作用見(jiàn)代碼注釋?zhuān)?/p>

// For printf()#include <stdio.h>// For EXIT_*#include <stdlib.h>// For memcpy()#include <string.h>// For sleep()#include <unistd.h> #include <zmq.h> #include "libfancyhw.h"
必要的參數(shù)

定義 main 函數(shù)和后續(xù)過(guò)程中必要的參數(shù):

int main(void){    const unsigned int INIT_PARAM = 12345;    const unsigned int REPETITIONS = 10;    const unsigned int PACKET_SIZE = 16;    const char *TOPIC = "fancyhw_data";     ...
初始化

所有的庫(kù)都需要初始化。虛構(gòu)的那個(gè)只需要一個(gè)參數(shù):

fancyhw_init(INIT_PARAM);

ZeroMQ 庫(kù)需要實(shí)打?qū)嵉某跏蓟?。首先,定義對(duì)象 context,它是用來(lái)管理全部的套接字的:

void *context = zmq_ctx_new(); if (!context){    printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE;}

之后定義用來(lái)發(fā)送數(shù)據(jù)的套接字。ZeroMQ 支持若干種套接字,各有其用。使用 publish 套接字(也叫 PUB 套接字),可以復(fù)制消息并分發(fā)到多個(gè)接收端。這使得你可以讓多個(gè)接收端接收同一個(gè)消息。沒(méi)有接收者的消息將被丟棄(即不會(huì)入消息隊(duì)列)。用法如下:

void *data_socket = zmq_socket(context, ZMQ_PUB);

套接字需要綁定到一個(gè)具體的地址,這樣客戶(hù)端就知道要連接哪里了。本例中,使用了 TCP 傳輸層(當(dāng)然也有 其它選項(xiàng),但 TCP 是不錯(cuò)的默認(rèn)選擇):

const int rb = zmq_bind(data_socket, "tcp://*:5555"); if (rb != 0){    printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE;}

下一步, 計(jì)算一些后續(xù)要用到的值。 注意下面代碼中的 TOPIC,因?yàn)?nbsp;PUB 套接字發(fā)送的消息需要綁定一個(gè)主題。主題用于供接收者過(guò)濾消息:

const size_t topic_size = strlen(TOPIC);const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t); printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
發(fā)送消息

啟動(dòng)一個(gè)發(fā)送消息的循環(huán),循環(huán) REPETITIONS 次:

for (unsigned int i = 0; i < REPETITIONS; i++){    ...

發(fā)送消息前,先填充一個(gè)長(zhǎng)度為 PACKET_SIZE 的緩沖區(qū)。本庫(kù)提供的是 16 個(gè)位的有符號(hào)整數(shù)。因?yàn)?C 語(yǔ)言中 int 類(lèi)型占用空間大小與平臺(tái)相關(guān),不是確定的值,所以要使用指定寬度的 int 變量:

int16_t buffer[PACKET_SIZE]; for (unsigned int j = 0; j < PACKET_SIZE; j++){    buffer[j] = fancyhw_read_val();} printf("Read %u data values\n", PACKET_SIZE);

消息的準(zhǔn)備和發(fā)送的第一步是創(chuàng)建 ZeroMQ 消息,為消息分配必要的內(nèi)存空間??瞻椎南⑹怯糜诜庋b要發(fā)送的數(shù)據(jù)的:

zmq_msg_t envelope; const int rmi = zmq_msg_init_size(&envelope, envelope_size);if (rmi != 0){    printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));     zmq_msg_close(&envelope);     break;}

現(xiàn)在內(nèi)存空間已分配,數(shù)據(jù)保存在 ZeroMQ 消息 “信封”中。函數(shù) zmq_msg_data() 返回一個(gè)指向封裝數(shù)據(jù)緩存區(qū)頂端的指針。第一部分是主題,之后是一個(gè)空格,最后是二進(jìn)制數(shù)。主題和二進(jìn)制數(shù)據(jù)之間的分隔符采用空格字符。需要遍歷緩存區(qū)的話(huà),使用類(lèi)型轉(zhuǎn)換和 指針?biāo)惴?。(感謝 C 語(yǔ)言,讓事情變得直截了當(dāng)。)做法如下:

memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))

通過(guò) data_socket 發(fā)送消息:

const size_t rs = zmq_msg_send(&envelope, data_socket, 0);if (rs != envelope_size){    printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));     zmq_msg_close(&envelope);     break;}

使用數(shù)據(jù)之前要先解除封裝:

zmq_msg_close(&envelope); printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
清理

C 語(yǔ)言不提供 垃圾收集 功能,用完之后記得要自己掃尾。發(fā)送消息之后結(jié)束程序之前,需要運(yùn)行掃尾代碼,釋放分配的內(nèi)存:

const int rc = zmq_close(data_socket); if (rc != 0){    printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE;} const int rd = zmq_ctx_destroy(context); if (rd != 0){    printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));     return EXIT_FAILURE;} return EXIT_SUCCESS;
完整 C 代碼

保存下面完整的接口代碼到本地名為 hw_interface.c 的文件:

// For printf()#include <stdio.h>// For EXIT_*#include <stdlib.h>// For memcpy()#include <string.h>// For sleep()#include <unistd.h> #include <zmq.h> #include "libfancyhw.h" int main(void){    const unsigned int INIT_PARAM = 12345;    const unsigned int REPETITIONS = 10;    const unsigned int PACKET_SIZE = 16;    const char *TOPIC = "fancyhw_data";     fancyhw_init(INIT_PARAM);     void *context = zmq_ctx_new();     if (!context)    {        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;    }     void *data_socket = zmq_socket(context, ZMQ_PUB);     const int rb = zmq_bind(data_socket, "tcp://*:5555");     if (rb != 0)    {        printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;    }     const size_t topic_size = strlen(TOPIC);    const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);     printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);     for (unsigned int i = 0; i < REPETITIONS; i++)    {        int16_t buffer[PACKET_SIZE];         for (unsigned int j = 0; j < PACKET_SIZE; j++)        {            buffer[j] = fancyhw_read_val();        }         printf("Read %u data values\n", PACKET_SIZE);         zmq_msg_t envelope;           const int rmi = zmq_msg_init_size(&envelope, envelope_size);        if (rmi != 0)        {            printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));               zmq_msg_close(&envelope);               break;        }               memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);         memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);         memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));           const size_t rs = zmq_msg_send(&envelope, data_socket, 0);        if (rs != envelope_size)        {            printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));               zmq_msg_close(&envelope);               break;        }           zmq_msg_close(&envelope);         printf("Message sent; i: %u, topic: %s\n", i, TOPIC);         sleep(1);    }     const int rc = zmq_close(data_socket);     if (rc != 0)    {        printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;    }     const int rd = zmq_ctx_destroy(context);     if (rd != 0)    {        printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));         return EXIT_FAILURE;    }     return EXIT_SUCCESS;}

用如下命令編譯:

$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface

如果沒(méi)有編譯錯(cuò)誤,你就可以運(yùn)行這個(gè)接口了。貼心的是,ZeroMQ PUB 套接字可以在沒(méi)有任何應(yīng)用發(fā)送或接受數(shù)據(jù)的狀態(tài)下運(yùn)行,這簡(jiǎn)化了使用復(fù)雜度,因?yàn)檫@樣不限制進(jìn)程啟動(dòng)的次序。

運(yùn)行該接口:

$ ./hw_interfaceTopic: fancyhw_data; topic size: 12; Envelope size: 45Read 16 data valuesMessage sent; i: 0, topic: fancyhw_dataRead 16 data valuesMessage sent; i: 1, topic: fancyhw_dataRead 16 data values......

輸出顯示數(shù)據(jù)已經(jīng)通過(guò) ZeroMQ 完成發(fā)送,現(xiàn)在要做的是讓一個(gè)程序去讀數(shù)據(jù)。

編寫(xiě) Python 數(shù)據(jù)處理器

現(xiàn)在已經(jīng)準(zhǔn)備好從 C 程序向 Python 應(yīng)用傳送數(shù)據(jù)了。

庫(kù)

需要兩個(gè)庫(kù)幫助實(shí)現(xiàn)數(shù)據(jù)傳輸。首先是 ZeroMQ 的 Python 封裝:

$ python3 -m pip install zmq

另一個(gè)就是 struct 庫(kù),用于解碼二進(jìn)制數(shù)據(jù)。這個(gè)庫(kù)是 Python 標(biāo)準(zhǔn)庫(kù)的一部分,所以不需要使用 pip 命令安裝。

Python 程序的第一部分是導(dǎo)入這些庫(kù):

import zmqimport struct
重要參數(shù)

使用 ZeroMQ 時(shí),只能向常量 TOPIC 定義相同的接收端發(fā)送消息:

topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic))
初始化

下一步,初始化上下文和套接字。使用 subscribe 套接字(也稱(chēng)為 SUB 套接字),它是 PUB 套接字的天生伴侶。這個(gè)套接字發(fā)送時(shí)也需要匹配主題。

with zmq.Context() as context:    socket = context.socket(zmq.SUB)     socket.connect("tcp://127.0.0.1:5555")    socket.setsockopt(zmq.SUBSCRIBE, topic)     i = 0     ...
接收消息

啟動(dòng)一個(gè)無(wú)限循環(huán),等待接收發(fā)送到 SUB 套接字的新消息。這個(gè)循環(huán)會(huì)在你按下 Ctrl+C 組合鍵或者內(nèi)部發(fā)生錯(cuò)誤時(shí)終止:

    try:        while True:             ... # we will fill this in next     except KeyboardInterrupt:        socket.close()    except Exception as error:        print("ERROR: {}".format(error))        socket.close()

這個(gè)循環(huán)等待 recv() 方法獲取的新消息,然后將接收到的內(nèi)容從第一個(gè)空格字符處分割開(kāi),從而得到主題:

binary_topic, data_buffer = socket.recv().split(b' ', 1)
解碼消息

Python 此時(shí)尚不知道主題是個(gè)字符串,使用標(biāo)準(zhǔn) ASCII 編解碼器進(jìn)行解碼:

topic = binary_topic.decode(encoding = 'ascii') print("Message {:d}:".format(i))print("\ttopic: '{}'".format(topic))

下一步就是使用 struct 庫(kù)讀取二進(jìn)制數(shù)據(jù),它可以將二進(jìn)制數(shù)據(jù)段轉(zhuǎn)換為明確的數(shù)值。首先,計(jì)算數(shù)據(jù)包中數(shù)值的組數(shù)。本例中使用的 16 個(gè)位的有符號(hào)整數(shù)對(duì)應(yīng)的是 struct 格式字符 中的 h

packet_size = len(data_buffer) // struct.calcsize("h") print("\tpacket size: {:d}".format(packet_size))

知道數(shù)據(jù)包中有多少組數(shù)據(jù)后,就可以通過(guò)構(gòu)建一個(gè)包含數(shù)據(jù)組數(shù)和數(shù)據(jù)類(lèi)型的字符串,來(lái)定義格式了(比如“16h”):

struct_format = "{:d}h".format(packet_size)

將二進(jìn)制數(shù)據(jù)串轉(zhuǎn)換為可直接打印的一系列數(shù)字:

data = struct.unpack(struct_format, data_buffer) print("\tdata: {}".format(data))
完整 Python 代碼

下面是 Python 實(shí)現(xiàn)的完整的接收端:

#! /usr/bin/env python3 import zmqimport struct topic = "fancyhw_data".encode('ascii') print("Reading messages with topic: {}".format(topic)) with zmq.Context() as context:    socket = context.socket(zmq.SUB)     socket.connect("tcp://127.0.0.1:5555")    socket.setsockopt(zmq.SUBSCRIBE, topic)     i = 0     try:        while True:            binary_topic, data_buffer = socket.recv().split(b' ', 1)             topic = binary_topic.decode(encoding = 'ascii')             print("Message {:d}:".format(i))            print("\ttopic: '{}'".format(topic))             packet_size = len(data_buffer) // struct.calcsize("h")             print("\tpacket size: {:d}".format(packet_size))             struct_format = "{:d}h".format(packet_size)             data = struct.unpack(struct_format, data_buffer)             print("\tdata: {}".format(data))             i += 1     except KeyboardInterrupt:        socket.close()    except Exception as error:        print("ERROR: {}".format(error))        socket.close()

將上面的內(nèi)容保存到名為 online_analysis.py 的文件。Python 代碼不需要編譯,你可以直接運(yùn)行它。

運(yùn)行輸出如下:

$ ./online_analysis.pyReading messages with topic: b'fancyhw_data'Message 0:        topic: 'fancyhw_data'        packet size: 16        data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)Message 1:        topic: 'fancyhw_data'        packet size: 16        data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)......

“怎么使用ZeroMQ消息庫(kù)在C和Python間共享數(shù)據(jù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI