溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Schema Registry的使用教程

發(fā)布時(shí)間:2021-09-10 10:29:15 來(lái)源:億速云 閱讀:254 作者:chen 欄目:互聯(lián)網(wǎng)科技

本篇內(nèi)容主要講解“Schema Registry的使用教程”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Schema Registry的使用教程”吧!

物聯(lián)網(wǎng)設(shè)備終端種類(lèi)繁雜,各廠商使用的編碼格式各異,所以在接入物聯(lián)網(wǎng)平臺(tái)的時(shí)候就產(chǎn)生了統(tǒng)一數(shù)據(jù)格式的需求,以便平臺(tái)之上的應(yīng)用進(jìn)行設(shè)備管理。

EMQ X 企業(yè)版 3.4.0 提供了 Schema Registry 功能,提供編解碼能力。Schema Registry 管理編解碼使用的 Schema、處理編碼或解碼請(qǐng)求并返回結(jié)果。Schema Registry 配合規(guī)則引擎,可適配各種場(chǎng)景的設(shè)備接入和規(guī)則設(shè)計(jì)。

數(shù)據(jù)格式

下圖展示了 Schema Registry 的一個(gè)應(yīng)用案例。多個(gè)設(shè)備上報(bào)不同格式的數(shù)據(jù),經(jīng)過(guò) Schema Registry 解碼之后,變?yōu)榻y(tǒng)一的內(nèi)部格式,然后轉(zhuǎn)發(fā)給后臺(tái)應(yīng)用。

Schema Registry的使用教程

[圖1: 使用 Schema Registry 對(duì)設(shè)備數(shù)據(jù)進(jìn)行編解碼]

二進(jìn)制格式支持

EMQ X 3.4.0 內(nèi)置的 Schema Registry 數(shù)據(jù)格式包括 Avro 和 Protobuf。Avro 和 Protobuf 是依賴 Schema 的數(shù)據(jù)格式,編碼后的數(shù)據(jù)為二進(jìn)制,使用 Schema Registry 解碼后的內(nèi)部數(shù)據(jù)格式(Map,稍后講解) 可直接被規(guī)則引擎和其他插件使用。此外 Schema Registry 支持用戶自定義的 (3rd-party) 編解碼服務(wù),通過(guò) HTTP 或 TCP 回調(diào)的方式,進(jìn)行更加貼近業(yè)務(wù)需求的編解碼。

架構(gòu)設(shè)計(jì)

Schema Registry 為 Avro 和 Protobuf 等內(nèi)置編碼格式維護(hù) Schema 文本,但對(duì)于自定義編解碼 (3rd-party) 格式,如需要 Schema,Schema 文本需由編解碼服務(wù)自己維護(hù)。Schema Registry 為每個(gè) Schema 創(chuàng)建一個(gè) Schema ID,Schema API 提供了通過(guò) Schema ID 的添加、查詢和刪除操作。

Schema Registry 既可以解碼,也可以編碼。編碼和解碼時(shí)需要指定 Schema ID。

Schema Registry的使用教程

[圖2: Schema Registry 架構(gòu)示意圖]

編碼調(diào)用示例:參數(shù)為 Schema

schema_encode(SchemaID, Data) -> RawData

解碼調(diào)用示例:

schema_decode(SchemaID, RawData) -> Data

常見(jiàn)的使用案例是,使用規(guī)則引擎來(lái)調(diào)用 Schema Registry 提供的編碼和解碼接口,然后將編碼或解碼后的數(shù)據(jù)作為后續(xù)動(dòng)作的輸入。

編解碼 + 規(guī)則引擎

EMQ X 的消息處理層面可分為消息路由(Messaging)、規(guī)則引擎(Rule Engine)、數(shù)據(jù)格式轉(zhuǎn)換(Data Conversion) 三個(gè)部分。

EMQ X 的 PUB/SUB 系統(tǒng)將消息路由到指定的主題。規(guī)則引擎可以靈活地配置數(shù)據(jù)的業(yè)務(wù)規(guī)則,按規(guī)則匹配消息,然后指定相應(yīng)動(dòng)作。數(shù)據(jù)格式轉(zhuǎn)換發(fā)生在規(guī)則匹配的過(guò)程之前,先將數(shù)據(jù)轉(zhuǎn)換為可參與規(guī)則匹配的 Map 格式,然后進(jìn)行匹配。

Schema Registry的使用教程

[圖3: Messaging, Rule Engine and Schema Registry]

規(guī)則引擎內(nèi)部數(shù)據(jù)格式(Map)

規(guī)則引擎內(nèi)部使用的數(shù)據(jù)格式為 Erlang Map,所以如果原數(shù)據(jù)內(nèi)容為二進(jìn)制或者其他格式,必須使用編解碼函數(shù)(比如上面提到的 schema_decode 和 json_decode 函數(shù)) 將其轉(zhuǎn)換為 Map。

Map 是一個(gè) Key-Value 形式的數(shù)據(jù)結(jié)構(gòu),形如 #{key => value}。例如,user = #{id => 1, name => "Steve"} 定義了一個(gè) id1,name"Steve"user Map。

SQL 語(yǔ)句提供了 "." 操作符嵌套地提取和添加 Map 字段。下面是使用 SQL 語(yǔ)句對(duì)這個(gè) Map 操作的示例:

SELECT user.id AS my_id

SQL 語(yǔ)句的篩選結(jié)果為 #{my_id => 1}

JSON 編解碼

規(guī)則引擎的 SQL 語(yǔ)句提供了對(duì) JSON 格式字符串的編解碼支持,將 JSON 字符串和 Map 格式相互轉(zhuǎn)換的 SQL 函數(shù)為 json_decode() 和 json_encode():

SELECT json_decode(payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~= "t/#"

上面這個(gè) SQL 語(yǔ)句將會(huì)匹配到 payload 內(nèi)容為 JSON 字符串: {"x" = 1, "y" = 1} , 并且 topic 為 t/a 的 MQTT 消息。

json_decode(payload) as p 將 JSON 字符串解碼為下面的 Map 數(shù)據(jù)結(jié)構(gòu),從而可以在 WHERE 子句中使用 p.x 和 p.y 使用 Map 中的字段:

#{
  p => #{
    x => 1,
    y => 1
  }
}

注意: AS 子句是必須的,將解碼之后的數(shù)據(jù)賦值給某個(gè)Key,后面才能對(duì)其進(jìn)行后續(xù)操作。

編解碼實(shí)戰(zhàn)

Protobuf 數(shù)據(jù)解析舉例

規(guī)則需求

設(shè)備發(fā)布一個(gè)使用 Protobuf 編碼的二進(jìn)制消息,需要通過(guò)規(guī)則引擎匹配過(guò)后,將消息重新發(fā)布到與 "name" 字段相關(guān)的主題上。主題的格式為 "person/${name}"。

比如,將 "name" 字段為 "Shawn" 的消息重新發(fā)布到主題 "person/Shawn"。

創(chuàng)建 Schema

在 EMQ X 的 Dashboard 界面,使用下面的參數(shù)創(chuàng)建一個(gè) Protobuf Schema:

  1. 名稱:protobuf_person

  2. 編解碼類(lèi)型:protobuf

  3. Schema:下面的 protobuf schema 定義了一個(gè) Person 消息。

    message Person {
      required string name = 1;
      required int32 id = 2;
      optional string email = 3;
    }


Schema 創(chuàng)建完成后,emqx 會(huì)分配一個(gè) Schema ID 和 Version。如果是第一次創(chuàng)建 "protobuf_person",Schema ID 為 "protobuf_person:1.0"。

創(chuàng)建規(guī)則

使用剛才創(chuàng)建好的 Schema ID 來(lái)編寫(xiě)規(guī)則 SQL 語(yǔ)句:

SELECT
  schema_decode('protobuf_person:1.0', payload, 'Person') as person, payload
FROM
  "message.publish"
WHERE
  topic =~ 't/#' and person.name = 'Shawn'

這里的關(guān)鍵點(diǎn)在于 schema_decode('protobuf_person:1.0', payload, 'Person'):

  • schema_decode 函數(shù)將 payload 字段的內(nèi)容按照 'protobuf_person:1.0' 這個(gè) Schema 來(lái)做解碼;

  • as person 將解碼后的值保存到變量 "person" 里;

  • 最后一個(gè)參數(shù) Person 指明了 payload 中的消息的類(lèi)型是 protobuf schema 里定義的 'Person' 類(lèi)型。

然后使用以下參數(shù)添加動(dòng)作:

  • 動(dòng)作類(lèi)型:消息重新發(fā)布

  • 目的主題:person/${person.name}

  • 消息內(nèi)容模板:${person}

這個(gè)動(dòng)作將解碼之后的 "person" 以 JSON 的格式發(fā)送到 person/${person.name} 這個(gè)主題。其中${person.name} 是個(gè)變量占位符,將在運(yùn)行時(shí)被替換為消息內(nèi)容中 "name" 字段的值。

設(shè)備端代碼

規(guī)則創(chuàng)建好之后,就可以模擬數(shù)據(jù)進(jìn)行測(cè)試了。

下面的代碼使用 Python 語(yǔ)言填充了一個(gè) Person 消息并編碼為二進(jìn)制數(shù)據(jù),然后將其發(fā)送到 "t/1" 主題。詳見(jiàn) 完整代碼。

def publish_msg(client):
    p = person_pb2.Person()
    p.id = 1
    p.name = "Shawn"
    p.email = "liuxy@emqx.io"
    message = p.SerializeToString()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)
檢查規(guī)則執(zhí)行結(jié)果
  1. 在 Dashboard 的 Websocket 工具里,登錄一個(gè) MQTT Client 并訂閱 "person/#"。

  2. 安裝 python 依賴,并執(zhí)行設(shè)備端代碼:

$ pip3 install protobuf
$ pip3 install paho-mqtt

$ python3 ./pb2_mqtt.py
Connected with result code 0
publish to topic: t/1, payload: b'nx05Shawnx10x01x1arliuxy@emqx.io'
t/1 b'nx05Shawnx10x01x1arliuxy@emqx.io'
  1. 檢查 Websocket 端收到主題為 person/Shawn 的消息:

{"email":"liuxy@emqx.io","id":1,"name":"Shawn"}

Avro 數(shù)據(jù)解析舉例

規(guī)則需求

設(shè)備發(fā)布一個(gè)使用 Avro 編碼的二進(jìn)制消息,需要通過(guò)規(guī)則引擎匹配過(guò)后,將消息重新發(fā)布到與 "name" 字段相關(guān)的主題上。主題的格式為 "avro_user/${name}"。

比如,將 "name" 字段為 "Shawn" 的消息重新發(fā)布到主題 "avro_user/Shawn"。

創(chuàng)建 Schema

在 EMQ X 的 Dashboard 界面,使用下面的參數(shù)創(chuàng)建一個(gè) Avro Schema:

  1. 名稱:avro_user

  2. 編解碼類(lèi)型:avro

  3. Schema:

    {
     "type":"record",
     "fields":[
         {"name":"name", "type":"string"},
         {"name":"favorite_number", "type":["int", "null"]},
         {"name":"favorite_color", "type":["string", "null"]}
     ]
    }


Schema 創(chuàng)建完成后,emqx 會(huì)分配一個(gè) Schema ID 和 Version。如果是第一次創(chuàng)建 "avro_user",Schema ID 為 "avro_user:1.0"。

創(chuàng)建規(guī)則

使用剛才創(chuàng)建好的 Schema ID 來(lái)編寫(xiě)規(guī)則 SQL 語(yǔ)句:

SELECT
  schema_decode('avro_user:1.0', payload) as avro_user, payload
FROM
  "message.publish"
WHERE
  topic =~ 't/#' and avro_user.name = 'Shawn'

這里的關(guān)鍵點(diǎn)在于 schema_decode('avro_user:1.0', payload):

  • schema_decode 函數(shù)將 payload 字段的內(nèi)容按照 'avro_user:1.0' 這個(gè) Schema 來(lái)做解碼;

  • as avro_user 將解碼后的值保存到變量 "avro_user" 里。

然后使用以下參數(shù)添加動(dòng)作:

  • 動(dòng)作類(lèi)型:消息重新發(fā)布

  • 目的主題:avro_user/${avro_user.name}

  • 消息內(nèi)容模板:${avro_user}

這個(gè)動(dòng)作將解碼之后的 "user" 以 JSON 的格式發(fā)送到 avro_user/${avro_user.name} 這個(gè)主題。其中${avro_user.name} 是個(gè)變量占位符,將在運(yùn)行時(shí)被替換為消息內(nèi)容中 "name" 字段的值。

設(shè)備端代碼

規(guī)則創(chuàng)建好之后,就可以模擬數(shù)據(jù)進(jìn)行測(cè)試了。

下面的代碼使用 Python 語(yǔ)言填充了一個(gè) User 消息并編碼為二進(jìn)制數(shù)據(jù),然后將其發(fā)送到 "t/1" 主題。詳見(jiàn) 完整代碼。

def publish_msg(client):
    datum_w = avro.io.DatumWriter(SCHEMA)
    buf = io.BytesIO()
    encoder = avro.io.BinaryEncoder(buf)
    datum_w.write({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder)
    message = buf.getvalue()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)
檢查規(guī)則執(zhí)行結(jié)果
  1. 在 Dashboard 的 Websocket 工具里,登錄一個(gè) MQTT Client 并訂閱 "avro_user/#"。

  2. 安裝 python 依賴,并執(zhí)行設(shè)備端代碼:

$ pip3 install protobuf
$ pip3 install paho-mqtt

$ python3 avro_mqtt.py
Connected with result code 0
publish to topic: t/1, payload: b'nShawnx00xb4nx00x06red'
  1. 檢查 Websocket 端收到主題為 avro_user/Shawn 的消息:

{"favorite_color":"red","favorite_number":666,"name":"Shawn"}

自定義編解碼舉例

規(guī)則需求

設(shè)備發(fā)布一個(gè)任意的消息,驗(yàn)證自部署的編解碼服務(wù)能正常工作。

創(chuàng)建 Schema

在 EMQ X 的 Dashboard 界面,使用下面的參數(shù)創(chuàng)建一個(gè) 3rd-Party Schema:

  1. 名稱:my_parser

  2. 編解碼類(lèi)型:3rd-party

  3. 第三方類(lèi)型: HTTP

  4. URL: http://127.0.0.1:9003/parser

  5. 編解碼配置: xor

其他配置保持默認(rèn)。emqx 會(huì)分配一個(gè) Schema ID "my_parser"。自定義編解碼沒(méi)有 Version 管理。

上面第 5 項(xiàng)編解碼配置是個(gè)可選項(xiàng),是個(gè)字符串,內(nèi)容跟編解碼服務(wù)的業(yè)務(wù)相關(guān)。

創(chuàng)建規(guī)則

使用剛才創(chuàng)建好的 Schema ID 來(lái)編寫(xiě)規(guī)則 SQL 語(yǔ)句:

SELECT
  schema_encode('my_parser', payload) as encoded_data,
  schema_decode('my_parser', encoded_data) as decoded_data
FROM
  "message.publish"
WHERE
  topic =~ 't/#'

這個(gè) SQL 語(yǔ)句首先對(duì)數(shù)據(jù)做了 Encode,然后又做了 Decode,目的在于驗(yàn)證編解碼過(guò)程是否正確:

  • schema_encode 函數(shù)將 payload 字段的內(nèi)容按照 'my_parser' 這個(gè) Schema 來(lái)做編碼,結(jié)果存儲(chǔ)到 encoded_data 這個(gè)變量里;

  • schema_decode 函數(shù)將 payload 字段的內(nèi)容按照 'my_parser' 這個(gè) Schema 來(lái)做解碼,結(jié)果存儲(chǔ)到 decoded_data 這個(gè)變量里;

最終這個(gè) SQL 語(yǔ)句的篩選結(jié)果是 encoded_datadecoded_data 這兩個(gè)變量。

然后使用以下參數(shù)添加動(dòng)作:

  • 動(dòng)作類(lèi)型:檢查(調(diào)試)

這個(gè)檢查動(dòng)作會(huì)把 SQL 語(yǔ)句篩選的結(jié)果打印到 emqx 控制臺(tái) (erlang shell) 里。

如果是使用 emqx console 啟動(dòng)的服務(wù),打印會(huì)直接顯示在控制臺(tái)里;如果是使用 emqx start 啟動(dòng)的服務(wù),打印會(huì)輸出到日志目錄下的 erlang.log.N 文件里,這里 "N" 為整數(shù),比如 "erlang.log.1", "erlang.log.2"。

編解碼服務(wù)端代碼

規(guī)則創(chuàng)建好之后,就可以模擬數(shù)據(jù)進(jìn)行測(cè)試了。所以首先需要編寫(xiě)一個(gè)自己的編解碼服務(wù)。

下面的代碼使用 Python 語(yǔ)言實(shí)現(xiàn)了一個(gè) HTTP 編解碼服務(wù),為簡(jiǎn)單起見(jiàn),這個(gè)服務(wù)提供兩種簡(jiǎn)單的方式來(lái)進(jìn)行編解碼(加解密),詳見(jiàn) 完整代碼:

  • 按位異或

  • 字符替換

def xor(data):
  """
  >>> xor(xor(b'abc'))
  b'abc'
  >>> xor(xor(b'!}~*'))
  b'!}~*'
  """
  length = len(data)
  bdata = bytearray(data)
  bsecret = bytearray(secret * length)
  result = bytearray(length)
  for i in range(length):
    result[i] = bdata[i] ^ bsecret[i]
  return bytes(result)

def subst(dtype, data, n):
  """
  >>> subst('decode', b'abc', 3)
  b'def'
  >>> subst('decode', b'ab~', 1)
  b'bc!'
  >>> subst('encode', b'def', 3)
  b'abc'
  >>> subst('encode', b'bc!', 1)
  b'ab~'
  """
  adata = array.array('B', data)
  for i in range(len(adata)):
    if dtype == 'decode':
      adata[i] = shift(adata[i], n)
    elif dtype == 'encode':
      adata[i] = shift(adata[i], -n)
  return bytes(adata)

將這個(gè)服務(wù)運(yùn)行起來(lái):

$ pip3 install flask
$ python3 http_parser_server.py
 * Serving Flask app "http_parser_server" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit)
檢查規(guī)則執(zhí)行結(jié)果

由于本示例比較簡(jiǎn)單,我們直接使用 MQTT Websocket 客戶端來(lái)模擬設(shè)備端發(fā)一條消息。

  1. 在 Dashboard 的 Websocket 工具里,登錄一個(gè) MQTT Client 并發(fā)布一條消息到 "t/1",內(nèi)容為 "hello"。

  2. 檢查 emqx 控制臺(tái) (erlang shell) 里的打印:

(emqx@127.0.0.1)1> [inspect]
        Selected Data: #{decoded_data => <<"hello">>,
                         encoded_data => <<9,4,13,13,14>>}
        Envs: #{event => 'message.publish',
                flags => #{dup => false,retain => false},
                from => <<"mqttjs_76e5a35b">>,
                headers =>
                    #{allow_publish => true,
                      peername => {{127,0,0,1},54753},
                      username => <<>>},
                id => <<0,5,146,30,146,38,123,81,244,66,0,0,62,117,0,1>>,
                node => 'emqx@127.0.0.1',payload => <<"hello">>,qos => 0,
                timestamp => {1568,34882,222929},
                topic => <<"t/1">>}
        Action Init Params: #{}

Select Data 是經(jīng)過(guò) SQL 語(yǔ)句篩選之后的數(shù)據(jù),Envs 是規(guī)則引擎內(nèi)部可用的環(huán)境變量,Action Init Params 是動(dòng)作的初始化參數(shù)。這三個(gè)數(shù)據(jù)均為 Map 格式。

Selected Data 里面的兩個(gè)字段 decoded_dataencoded_data 對(duì)應(yīng) SELECT 語(yǔ)句里面的兩個(gè) AS。因?yàn)?decoded_data 是編碼然后再解碼之后的結(jié)果,所以它又被還原為了我們發(fā)送的內(nèi)容 "hello",表明編解碼插件工作正常。

到此,相信大家對(duì)“Schema Registry的使用教程”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI