您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Schema Registry的使用教程”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Schema Registry的使用教程”吧!
物聯(lián)網(wǎng)設(shè)備終端種類(lèi)繁雜,各廠商使用的編碼格式各異,所以在接入物聯(lián)網(wǎng)平臺(tái)的時(shí)候就產(chǎn)生了統(tǒng)一數(shù)據(jù)格式的需求,以便平臺(tái)之上的應(yīng)用進(jìn)行設(shè)備管理。
EMQ X 企業(yè)版 3.4.0 提供了 Schema Registry 功能,提供編解碼能力。Schema Registry 管理編解碼使用的 Schema、處理編碼或解碼請(qǐng)求并返回結(jié)果。Schema Registry 配合規(guī)則引擎,可適配各種場(chǎng)景的設(shè)備接入和規(guī)則設(shè)計(jì)。
下圖展示了 Schema Registry 的一個(gè)應(yīng)用案例。多個(gè)設(shè)備上報(bào)不同格式的數(shù)據(jù),經(jīng)過(guò) Schema Registry 解碼之后,變?yōu)榻y(tǒng)一的內(nèi)部格式,然后轉(zhuǎn)發(fā)給后臺(tái)應(yīng)用。
[圖1: 使用 Schema Registry 對(duì)設(shè)備數(shù)據(jù)進(jìn)行編解碼]
EMQ X 3.4.0 內(nèi)置的 Schema Registry 數(shù)據(jù)格式包括 Avro 和 Protobuf。Avro 和 Protobuf 是依賴 Schema 的數(shù)據(jù)格式,編碼后的數(shù)據(jù)為二進(jìn)制,使用 Schema Registry 解碼后的內(nèi)部數(shù)據(jù)格式(Map,稍后講解) 可直接被規(guī)則引擎和其他插件使用。此外 Schema Registry 支持用戶自定義的 (3rd-party) 編解碼服務(wù),通過(guò) HTTP 或 TCP 回調(diào)的方式,進(jìn)行更加貼近業(yè)務(wù)需求的編解碼。
Schema Registry 為 Avro 和 Protobuf 等內(nèi)置編碼格式維護(hù) Schema 文本,但對(duì)于自定義編解碼 (3rd-party) 格式,如需要 Schema,Schema 文本需由編解碼服務(wù)自己維護(hù)。Schema Registry 為每個(gè) Schema 創(chuàng)建一個(gè) Schema ID,Schema API 提供了通過(guò) Schema ID 的添加、查詢和刪除操作。
Schema Registry 既可以解碼,也可以編碼。編碼和解碼時(shí)需要指定 Schema ID。
[圖2: Schema Registry 架構(gòu)示意圖]
編碼調(diào)用示例:參數(shù)為 Schema
schema_encode(SchemaID, Data) -> RawData
解碼調(diào)用示例:
schema_decode(SchemaID, RawData) -> Data
常見(jiàn)的使用案例是,使用規(guī)則引擎來(lái)調(diào)用 Schema Registry 提供的編碼和解碼接口,然后將編碼或解碼后的數(shù)據(jù)作為后續(xù)動(dòng)作的輸入。
EMQ X 的消息處理層面可分為消息路由(Messaging)、規(guī)則引擎(Rule Engine)、數(shù)據(jù)格式轉(zhuǎn)換(Data Conversion) 三個(gè)部分。
EMQ X 的 PUB/SUB 系統(tǒng)將消息路由到指定的主題。規(guī)則引擎可以靈活地配置數(shù)據(jù)的業(yè)務(wù)規(guī)則,按規(guī)則匹配消息,然后指定相應(yīng)動(dòng)作。數(shù)據(jù)格式轉(zhuǎn)換發(fā)生在規(guī)則匹配的過(guò)程之前,先將數(shù)據(jù)轉(zhuǎn)換為可參與規(guī)則匹配的 Map 格式,然后進(jìn)行匹配。
[圖3: Messaging, Rule Engine and Schema Registry]
規(guī)則引擎內(nèi)部使用的數(shù)據(jù)格式為 Erlang Map,所以如果原數(shù)據(jù)內(nèi)容為二進(jìn)制或者其他格式,必須使用編解碼函數(shù)(比如上面提到的 schema_decode 和 json_decode 函數(shù)) 將其轉(zhuǎn)換為 Map。
Map 是一個(gè) Key-Value 形式的數(shù)據(jù)結(jié)構(gòu),形如 #{key => value}。例如,user = #{id => 1, name => "Steve"}
定義了一個(gè) id
為 1
,name
為 "Steve"
的 user
Map。
SQL 語(yǔ)句提供了 "." 操作符嵌套地提取和添加 Map 字段。下面是使用 SQL 語(yǔ)句對(duì)這個(gè) Map 操作的示例:
SELECT user.id AS my_id
SQL 語(yǔ)句的篩選結(jié)果為 #{my_id => 1}
。
規(guī)則引擎的 SQL 語(yǔ)句提供了對(duì) JSON 格式字符串的編解碼支持,將 JSON 字符串和 Map 格式相互轉(zhuǎn)換的 SQL 函數(shù)為 json_decode() 和 json_encode():
SELECT json_decode(payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~= "t/#"
上面這個(gè) SQL 語(yǔ)句將會(huì)匹配到 payload 內(nèi)容為 JSON 字符串: {"x" = 1, "y" = 1}
, 并且 topic 為 t/a
的 MQTT 消息。
json_decode(payload) as p
將 JSON 字符串解碼為下面的 Map 數(shù)據(jù)結(jié)構(gòu),從而可以在 WHERE
子句中使用 p.x 和 p.y 使用 Map 中的字段:
#{ p => #{ x => 1, y => 1 } }
注意: AS
子句是必須的,將解碼之后的數(shù)據(jù)賦值給某個(gè)Key,后面才能對(duì)其進(jìn)行后續(xù)操作。
設(shè)備發(fā)布一個(gè)使用 Protobuf 編碼的二進(jìn)制消息,需要通過(guò)規(guī)則引擎匹配過(guò)后,將消息重新發(fā)布到與 "name" 字段相關(guān)的主題上。主題的格式為 "person/${name}"。
比如,將 "name" 字段為 "Shawn" 的消息重新發(fā)布到主題 "person/Shawn"。
在 EMQ X 的 Dashboard 界面,使用下面的參數(shù)創(chuàng)建一個(gè) Protobuf Schema:
名稱:protobuf_person
編解碼類(lèi)型:protobuf
Schema:下面的 protobuf schema 定義了一個(gè) Person 消息。
message Person { required string name = 1; required int32 id = 2; optional string email = 3; }
Schema 創(chuàng)建完成后,emqx 會(huì)分配一個(gè) Schema ID 和 Version。如果是第一次創(chuàng)建 "protobuf_person",Schema ID 為 "protobuf_person:1.0"。
使用剛才創(chuàng)建好的 Schema ID 來(lái)編寫(xiě)規(guī)則 SQL 語(yǔ)句:
SELECT schema_decode('protobuf_person:1.0', payload, 'Person') as person, payload FROM "message.publish" WHERE topic =~ 't/#' and person.name = 'Shawn'
這里的關(guān)鍵點(diǎn)在于 schema_decode('protobuf_person:1.0', payload, 'Person')
:
schema_decode
函數(shù)將 payload 字段的內(nèi)容按照 'protobuf_person:1.0' 這個(gè) Schema 來(lái)做解碼;
as person
將解碼后的值保存到變量 "person" 里;
最后一個(gè)參數(shù) Person
指明了 payload 中的消息的類(lèi)型是 protobuf schema 里定義的 'Person' 類(lèi)型。
然后使用以下參數(shù)添加動(dòng)作:
動(dòng)作類(lèi)型:消息重新發(fā)布
目的主題:person/${person.name}
消息內(nèi)容模板:${person}
這個(gè)動(dòng)作將解碼之后的 "person" 以 JSON 的格式發(fā)送到 person/${person.name}
這個(gè)主題。其中${person.name}
是個(gè)變量占位符,將在運(yùn)行時(shí)被替換為消息內(nèi)容中 "name" 字段的值。
規(guī)則創(chuàng)建好之后,就可以模擬數(shù)據(jù)進(jìn)行測(cè)試了。
下面的代碼使用 Python 語(yǔ)言填充了一個(gè) Person 消息并編碼為二進(jìn)制數(shù)據(jù),然后將其發(fā)送到 "t/1" 主題。詳見(jiàn) 完整代碼。
def publish_msg(client): p = person_pb2.Person() p.id = 1 p.name = "Shawn" p.email = "liuxy@emqx.io" message = p.SerializeToString() topic = "t/1" print("publish to topic: t/1, payload:", message) client.publish(topic, payload=message, qos=0, retain=False)
在 Dashboard 的 Websocket 工具里,登錄一個(gè) MQTT Client 并訂閱 "person/#"。
安裝 python 依賴,并執(zhí)行設(shè)備端代碼:
$ pip3 install protobuf $ pip3 install paho-mqtt $ python3 ./pb2_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'nx05Shawnx10x01x1arliuxy@emqx.io' t/1 b'nx05Shawnx10x01x1arliuxy@emqx.io'
檢查 Websocket 端收到主題為 person/Shawn
的消息:
{"email":"liuxy@emqx.io","id":1,"name":"Shawn"}
設(shè)備發(fā)布一個(gè)使用 Avro 編碼的二進(jìn)制消息,需要通過(guò)規(guī)則引擎匹配過(guò)后,將消息重新發(fā)布到與 "name" 字段相關(guān)的主題上。主題的格式為 "avro_user/${name}"。
比如,將 "name" 字段為 "Shawn" 的消息重新發(fā)布到主題 "avro_user/Shawn"。
在 EMQ X 的 Dashboard 界面,使用下面的參數(shù)創(chuàng)建一個(gè) Avro Schema:
名稱:avro_user
編解碼類(lèi)型:avro
Schema:
{ "type":"record", "fields":[ {"name":"name", "type":"string"}, {"name":"favorite_number", "type":["int", "null"]}, {"name":"favorite_color", "type":["string", "null"]} ] }
Schema 創(chuàng)建完成后,emqx 會(huì)分配一個(gè) Schema ID 和 Version。如果是第一次創(chuàng)建 "avro_user",Schema ID 為 "avro_user:1.0"。
使用剛才創(chuàng)建好的 Schema ID 來(lái)編寫(xiě)規(guī)則 SQL 語(yǔ)句:
SELECT schema_decode('avro_user:1.0', payload) as avro_user, payload FROM "message.publish" WHERE topic =~ 't/#' and avro_user.name = 'Shawn'
這里的關(guān)鍵點(diǎn)在于 schema_decode('avro_user:1.0', payload)
:
schema_decode
函數(shù)將 payload 字段的內(nèi)容按照 'avro_user:1.0' 這個(gè) Schema 來(lái)做解碼;
as avro_user
將解碼后的值保存到變量 "avro_user" 里。
然后使用以下參數(shù)添加動(dòng)作:
動(dòng)作類(lèi)型:消息重新發(fā)布
目的主題:avro_user/${avro_user.name}
消息內(nèi)容模板:${avro_user}
這個(gè)動(dòng)作將解碼之后的 "user" 以 JSON 的格式發(fā)送到 avro_user/${avro_user.name}
這個(gè)主題。其中${avro_user.name}
是個(gè)變量占位符,將在運(yùn)行時(shí)被替換為消息內(nèi)容中 "name" 字段的值。
規(guī)則創(chuàng)建好之后,就可以模擬數(shù)據(jù)進(jìn)行測(cè)試了。
下面的代碼使用 Python 語(yǔ)言填充了一個(gè) User 消息并編碼為二進(jìn)制數(shù)據(jù),然后將其發(fā)送到 "t/1" 主題。詳見(jiàn) 完整代碼。
def publish_msg(client): datum_w = avro.io.DatumWriter(SCHEMA) buf = io.BytesIO() encoder = avro.io.BinaryEncoder(buf) datum_w.write({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder) message = buf.getvalue() topic = "t/1" print("publish to topic: t/1, payload:", message) client.publish(topic, payload=message, qos=0, retain=False)
在 Dashboard 的 Websocket 工具里,登錄一個(gè) MQTT Client 并訂閱 "avro_user/#"。
安裝 python 依賴,并執(zhí)行設(shè)備端代碼:
$ pip3 install protobuf $ pip3 install paho-mqtt $ python3 avro_mqtt.py Connected with result code 0 publish to topic: t/1, payload: b'nShawnx00xb4nx00x06red'
檢查 Websocket 端收到主題為 avro_user/Shawn
的消息:
{"favorite_color":"red","favorite_number":666,"name":"Shawn"}
設(shè)備發(fā)布一個(gè)任意的消息,驗(yàn)證自部署的編解碼服務(wù)能正常工作。
在 EMQ X 的 Dashboard 界面,使用下面的參數(shù)創(chuàng)建一個(gè) 3rd-Party Schema:
名稱:my_parser
編解碼類(lèi)型:3rd-party
第三方類(lèi)型: HTTP
URL: http://127.0.0.1:9003/parser
編解碼配置: xor
其他配置保持默認(rèn)。emqx 會(huì)分配一個(gè) Schema ID "my_parser"。自定義編解碼沒(méi)有 Version 管理。
上面第 5 項(xiàng)編解碼配置是個(gè)可選項(xiàng),是個(gè)字符串,內(nèi)容跟編解碼服務(wù)的業(yè)務(wù)相關(guān)。
使用剛才創(chuàng)建好的 Schema ID 來(lái)編寫(xiě)規(guī)則 SQL 語(yǔ)句:
SELECT schema_encode('my_parser', payload) as encoded_data, schema_decode('my_parser', encoded_data) as decoded_data FROM "message.publish" WHERE topic =~ 't/#'
這個(gè) SQL 語(yǔ)句首先對(duì)數(shù)據(jù)做了 Encode,然后又做了 Decode,目的在于驗(yàn)證編解碼過(guò)程是否正確:
schema_encode
函數(shù)將 payload 字段的內(nèi)容按照 'my_parser' 這個(gè) Schema 來(lái)做編碼,結(jié)果存儲(chǔ)到 encoded_data
這個(gè)變量里;
schema_decode
函數(shù)將 payload 字段的內(nèi)容按照 'my_parser' 這個(gè) Schema 來(lái)做解碼,結(jié)果存儲(chǔ)到 decoded_data
這個(gè)變量里;
最終這個(gè) SQL 語(yǔ)句的篩選結(jié)果是 encoded_data
和 decoded_data
這兩個(gè)變量。
然后使用以下參數(shù)添加動(dòng)作:
動(dòng)作類(lèi)型:檢查(調(diào)試)
這個(gè)檢查動(dòng)作會(huì)把 SQL 語(yǔ)句篩選的結(jié)果打印到 emqx 控制臺(tái) (erlang shell) 里。
如果是使用 emqx console 啟動(dòng)的服務(wù),打印會(huì)直接顯示在控制臺(tái)里;如果是使用 emqx start 啟動(dòng)的服務(wù),打印會(huì)輸出到日志目錄下的 erlang.log.N 文件里,這里 "N" 為整數(shù),比如 "erlang.log.1", "erlang.log.2"。
規(guī)則創(chuàng)建好之后,就可以模擬數(shù)據(jù)進(jìn)行測(cè)試了。所以首先需要編寫(xiě)一個(gè)自己的編解碼服務(wù)。
下面的代碼使用 Python 語(yǔ)言實(shí)現(xiàn)了一個(gè) HTTP 編解碼服務(wù),為簡(jiǎn)單起見(jiàn),這個(gè)服務(wù)提供兩種簡(jiǎn)單的方式來(lái)進(jìn)行編解碼(加解密),詳見(jiàn) 完整代碼:
按位異或
字符替換
def xor(data): """ >>> xor(xor(b'abc')) b'abc' >>> xor(xor(b'!}~*')) b'!}~*' """ length = len(data) bdata = bytearray(data) bsecret = bytearray(secret * length) result = bytearray(length) for i in range(length): result[i] = bdata[i] ^ bsecret[i] return bytes(result) def subst(dtype, data, n): """ >>> subst('decode', b'abc', 3) b'def' >>> subst('decode', b'ab~', 1) b'bc!' >>> subst('encode', b'def', 3) b'abc' >>> subst('encode', b'bc!', 1) b'ab~' """ adata = array.array('B', data) for i in range(len(adata)): if dtype == 'decode': adata[i] = shift(adata[i], n) elif dtype == 'encode': adata[i] = shift(adata[i], -n) return bytes(adata)
將這個(gè)服務(wù)運(yùn)行起來(lái):
$ pip3 install flask $ python3 http_parser_server.py * Serving Flask app "http_parser_server" (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: off * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit)
由于本示例比較簡(jiǎn)單,我們直接使用 MQTT Websocket 客戶端來(lái)模擬設(shè)備端發(fā)一條消息。
在 Dashboard 的 Websocket 工具里,登錄一個(gè) MQTT Client 并發(fā)布一條消息到 "t/1",內(nèi)容為 "hello"。
檢查 emqx 控制臺(tái) (erlang shell) 里的打印:
(emqx@127.0.0.1)1> [inspect] Selected Data: #{decoded_data => <<"hello">>, encoded_data => <<9,4,13,13,14>>} Envs: #{event => 'message.publish', flags => #{dup => false,retain => false}, from => <<"mqttjs_76e5a35b">>, headers => #{allow_publish => true, peername => {{127,0,0,1},54753}, username => <<>>}, id => <<0,5,146,30,146,38,123,81,244,66,0,0,62,117,0,1>>, node => 'emqx@127.0.0.1',payload => <<"hello">>,qos => 0, timestamp => {1568,34882,222929}, topic => <<"t/1">>} Action Init Params: #{}
Select Data 是經(jīng)過(guò) SQL 語(yǔ)句篩選之后的數(shù)據(jù),Envs 是規(guī)則引擎內(nèi)部可用的環(huán)境變量,Action Init Params 是動(dòng)作的初始化參數(shù)。這三個(gè)數(shù)據(jù)均為 Map
格式。
Selected Data 里面的兩個(gè)字段 decoded_data
和 encoded_data
對(duì)應(yīng) SELECT 語(yǔ)句里面的兩個(gè) AS。因?yàn)?decoded_data
是編碼然后再解碼之后的結(jié)果,所以它又被還原為了我們發(fā)送的內(nèi)容 "hello",表明編解碼插件工作正常。
到此,相信大家對(duì)“Schema Registry的使用教程”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。