您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“RocketMQ ACL實(shí)現(xiàn)機(jī)制是什么”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
根據(jù)RocketMQ ACL使用手冊,我們應(yīng)該首先看一下Broker服務(wù)器在開啟ACL機(jī)制時如何加載配置文件,并如何工作的。
Broker端ACL的入口代碼為:BrokerController#initialAcl
private void initialAcl() { if (!this.brokerConfig.isAclEnable()) { // [@1](https://my.oschina.net/u/1198) log.info("The broker dose not enable acl"); return; } List<accessvalidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); // @2 if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { // [@3](https://my.oschina.net/u/2648711) final AccessValidator validator = accessValidator; this.registerServerRPCHook(new RPCHook() { [@Override](https://my.oschina.net/u/1162528) public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); // @4 } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
本方法的實(shí)現(xiàn)共4個關(guān)鍵點(diǎn)。 代碼@1:首先判斷Broker是否開啟了acl,通過配置參數(shù)aclEnable指定,默認(rèn)為false。
代碼@2:使用類似SPI機(jī)制,加載配置的AccessValidator,該方法返回一個列表,其實(shí)現(xiàn)邏輯時讀取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的訪問驗證器,默認(rèn)配置內(nèi)容如下:
代碼@3:遍歷配置的訪問驗證器(AccessValidator),并向Broker處理服務(wù)器注冊鉤子函數(shù),RPCHook的doBeforeRequest方法會在服務(wù)端接收到請求,將其請求解碼后,執(zhí)行處理請求之前被調(diào)用;RPCHook的doAfterResponse方法會在處理完請求后,將結(jié)果返回之前被調(diào)用,其調(diào)用如圖所示:
代碼@4:在RPCHook#doBeforeRequest方法中調(diào)用AccessValidator#validate, 在真實(shí)處理命令之前,先執(zhí)行ACL的驗證邏輯,如果擁有該操作的執(zhí)行權(quán)限,則放行,否則拋出AclException。
接下來,我們將重點(diǎn)放到Broker默認(rèn)實(shí)現(xiàn)的訪問驗證器:PlainAccessValidator。
AccessValidator 訪問驗證器接口,主要定義兩個接口。 1)AccessResource parse(RemotingCommand request, String remoteAddr) 從請求頭中解析本次請求對應(yīng)的訪問資源,即本次請求需要的訪問權(quán)限。 2)void validate(AccessResource accessResource) 根據(jù)本次需要訪問的權(quán)限,與請求用戶擁有的權(quán)限進(jìn)行對比驗證,判斷是擁有權(quán)限,如果沒有訪問該操作的權(quán)限,則拋出異常,否則放行。
PlainAccessValidator RocketMQ默認(rèn)提供的基于yml配置格式的訪問驗證器。
接下來我們重點(diǎn)看一下PlainAccessValidator的parse方法與validate方法的實(shí)現(xiàn)細(xì)節(jié)。在講解該方法之前,我們首先認(rèn)識一下RocketMQ封裝訪問資源的PlainAccessResource。
我們對其屬性一一做個介紹:
private String accessKey 訪問Key,用戶名。
private String secretKey 用戶密碼。
private String whiteRemoteAddress 遠(yuǎn)程IP地址白名單。
private boolean admin 是否是管理員角色。
private byte defaultTopicPerm = 1 默認(rèn)topic訪問權(quán)限,即如果沒有配置topic的權(quán)限,則Topic默認(rèn)的訪問權(quán)限為1,表示為DENY。
private byte defaultGroupPerm = 1 默認(rèn)的消費(fèi)組訪問權(quán)限,默認(rèn)為DENY。
private Map<string, byte> resourcePermMap 資源需要的訪問權(quán)限映射表。
private RemoteAddressStrategy remoteAddressStrategy 遠(yuǎn)程IP地址驗證策略。
private int requestCode 當(dāng)前請求的requestCode。
private byte[] content 請求頭與請求體的內(nèi)容。
private String signature 簽名字符串,這是通常的套路,在客戶端時,首先將請求參數(shù)排序,然后使用secretKey生成簽名字符串,服務(wù)端重復(fù)這個步驟,然后對比簽名字符串,如果相同,則認(rèn)為登錄成功,否則失敗。
private String secretToken 密鑰token。
private String recognition 目前作用未知,代碼中目前未被使用。
public PlainAccessValidator() { aclPlugEngine = new PlainPermissionLoader(); }
構(gòu)造函數(shù),直接創(chuàng)建PlainPermissionLoader對象,從命名上來看,應(yīng)該是觸發(fā)acl規(guī)則的加載,即解析plain_acl.yml,接下來會重點(diǎn)探討,即acl啟動流程之配置文件的解析。
該方法的作用就是從請求命令中解析出本次訪問所需要的訪問權(quán)限,最終構(gòu)建AccessResource對象,為后續(xù)的校驗權(quán)限做準(zhǔn)備。
PlainAccessResource accessResource = new PlainAccessResource(); if (remoteAddr != null && remoteAddr.contains(":")) { accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]); } else { accessResource.setWhiteRemoteAddress(remoteAddr); }
Step1:首先創(chuàng)建PlainAccessResource,從遠(yuǎn)程地址中提取出遠(yuǎn)程訪問IP地址。
if (request.getExtFields() == null) { throw new AclException("request's extFields value is null"); } accessResource.setRequestCode(request.getCode()); accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
Step2:如果請求頭中的擴(kuò)展字段為空,則拋出異常,如果不為空,則從請求頭中讀取requestCode、accessKey(請求用戶名)、簽名字符串(signature)、secretToken。
try { switch (request.getCode()) { case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; } } catch (Throwable t) { throw new AclException(t.getMessage(), t); }
Step3:根據(jù)請求命令,設(shè)置本次請求需要擁有的權(quán)限,上述代碼比較簡單,就是從請求中得出本次操作的Topic、消息組名稱,為了方便區(qū)分topic與消費(fèi)組,消費(fèi)組使用消費(fèi)者對應(yīng)的重試主題,當(dāng)成資源的Key,從這里也可以看出,當(dāng)前版本需要進(jìn)行ACL權(quán)限驗證的請求命令如下:
SEND_MESSAGE
SEND_MESSAGE_V2
CONSUMER_SEND_MSG_BACK
PULL_MESSAGE
QUERY_MESSAGE
HEART_BEAT
UNREGISTER_CLIENT
GET_CONSUMER_LIST_BY_GROUP
UPDATE_CONSUMER_OFFSET
// Content SortedMap<string, string> map = new TreeMap<string, string>(); for (Map.Entry<string, string> entry : request.getExtFields().entrySet()) { if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { map.put(entry.getKey(), entry.getValue()); } } accessResource.setContent(AclUtils.combineRequestContent(request, map)); return accessResource;
Step4:對擴(kuò)展字段進(jìn)行排序,便于生成簽名字符串,然后將擴(kuò)展字段與請求體(body)寫入content字段。完成從請求頭中解析出本次請求需要驗證的權(quán)限。
public void validate(AccessResource accessResource) { aclPlugEngine.validate((PlainAccessResource) accessResource); }
驗證權(quán)限,即根據(jù)本次請求需要的權(quán)限與當(dāng)前用戶所擁有的權(quán)限進(jìn)行對比,如果符合,則正常執(zhí)行;否則拋出AclException。
為了揭開配置文件的解析與驗證,我們將目光投入到PlainPermissionLoader。
該類的主要職責(zé):加載權(quán)限,即解析acl主要配置文件plain_acl.yml。
下面對其核心屬性與核心方法一一介紹:
DEFAULT_PLAIN_ACL_FILE 默認(rèn)acl配置文件名稱,默認(rèn)值為conf/plain_acl.yml。
String fileName acl配置文件名稱,默認(rèn)為DEFAULT_PLAIN_ACL_FILE ,可以通過系統(tǒng)參數(shù)-Drocketmq.acl.plain.file=fileName指定。
Map<string, plainaccessresource> plainAccessResourceMap 解析出來的權(quán)限配置映射表,以用戶名為鍵。
RemoteAddressStrategyFactory remoteAddressStrategyFactory 遠(yuǎn)程IP解析策略工廠,用于解析白名單IP地址。
boolean isWatchStart 是否開啟了文件監(jiān)聽,即自動監(jiān)聽plain_acl.yml文件,一旦該文件改變,可在不重啟服務(wù)器的情況下自動生效。
public PlainPermissionLoader() 構(gòu)造方法。
public void load() 加載配置文件。
public void validate(PlainAccessResource plainAccessResource) 驗證是否有權(quán)限訪問待訪問資源。
public PlainPermissionLoader() { load(); watch(); }
在構(gòu)造方法中調(diào)用load與watch方法。
Map<string, plainaccessresource> plainAccessResourceMap = new HashMap<>(); List<remoteaddressstrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); String path = fileHome + File.separator + fileName; JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);
Step1:初始化plainAccessResourceMap(用戶配置的訪問資源,即權(quán)限容器)、globalWhiteRemoteAddressStrategy:全局IP白名單訪問策略。配置文件,默認(rèn)為${ROCKETMQ_HOME}/conf/plain_acl.yml。
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } }
Step2:globalWhiteRemoteAddresses:全局白名單,類型為數(shù)組。根據(jù)配置的規(guī)則,使用remoteAddressStrategyFactory獲取一個訪問策略,下文會重點(diǎn)介紹其配置規(guī)則。
JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { List<plainaccessconfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap;
Step3:解析plain_acl.yml文件中的另外一個根元素accounts,用戶定義的權(quán)限信息。從PlainAccessConfig的定義來看,accounts標(biāo)簽下支持如下標(biāo)簽:
accessKey
secretKey
whiteRemoteAddress
admin
defaultTopicPerm
defaultGroupPerm
topicPerms
groupPerms 上述標(biāo)簽的說明,請參考::《RocketMQ ACL使用指南》 。具體的解析過程比較容易,就不再細(xì)說。
load方法主要完成acl配置文件的解析,將用戶定義的權(quán)限加載到內(nèi)存中。
private void watch() { try { String watchFilePath = fileHome + fileName; FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() { @Override public void onChanged(String path) { log.info("The plain acl yml changed, reload the context"); load(); } }); fileWatchService.start(); log.info("Succeed to start AclWatcherService"); this.isWatchStart = true; } catch (Exception e) { log.error("Failed to start AclWatcherService", e); } }
監(jiān)聽器,默認(rèn)以500ms的頻率判斷文件的內(nèi)容是否變化。在文件內(nèi)容發(fā)生變化后調(diào)用load()方法,重新加載配置文件。那FileWatchService是如何判斷兩個文件的內(nèi)容發(fā)生了變化呢?
FileWatchService#hash private String hash(String filePath) throws IOException, NoSuchAlgorithmException { Path path = Paths.get(filePath); md.update(Files.readAllBytes(path)); byte[] hash = md.digest(); return UtilAll.bytes2string(hash); }
獲取文件md5簽名來做對比,這里為什么不在啟動時先記錄上一次文件的修改時間,然后先判斷其修改時間是否變化,再判斷其內(nèi)容是否真正發(fā)生變化。
// Check the global white remote addr for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { if (remoteAddressStrategy.match(plainAccessResource)) { return; } }
Step1:首先使用全局白名單對資源進(jìn)行驗證,只要一個規(guī)則匹配,則返回,表示認(rèn)證成功。
if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } Step2:如果請求信息中,沒有設(shè)置用戶名,則拋出未配置AccessKey異常;如果Broker中并為配置該用戶的配置信息,則拋出AclException。 // Check the white addr for accesskey PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; }
Step3:如果用戶配置的白名單與待訪問資源規(guī)則匹配的話,則直接發(fā)認(rèn)證通過。
// Check the signature String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); }
Step4:驗證簽名。
checkPerm(plainAccessResource, ownedAccess);
Step5:調(diào)用checkPerm方法,驗證需要的權(quán)限與擁有的權(quán)限是否匹配。
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); }
Step6:如果當(dāng)前的請求命令屬于必須是Admin用戶才能訪問的權(quán)限,并且當(dāng)前用戶并不是管理員角色,則拋出異常,如下命令需要admin角色才能進(jìn)行的操作:
Map<string, byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<string, byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } if (ownedPermMap == null && ownedAccess.isAdmin()) { // If the ownedPermMap is null and it is an admin user, then return return; }
Step7:如果該請求不需要進(jìn)行權(quán)限驗證,則通過認(rèn)證,如果當(dāng)前用戶的角色是管理員,并且沒有配置用戶權(quán)限,則認(rèn)證通過,返回。
for (Map.Entry<string, byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); boolean isGroup = PlainAccessResource.isRetryTopic(resource); if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) { // Check the default perm byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm(); if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } }
Step8:遍歷需要權(quán)限與擁有的權(quán)限進(jìn)行對比,如果配置對應(yīng)的權(quán)限,則判斷是否匹配;如果未配置權(quán)限,則判斷默認(rèn)權(quán)限時是否允許,不允許,則拋出AclException。
驗證邏輯就介紹到這里了,下面給出其匹配流程圖:
上述闡述了從Broker服務(wù)器啟動、加載acl配置文件流程、動態(tài)監(jiān)聽配置文件、服務(wù)端權(quán)限驗證流程,接下來我們看一下客戶端關(guān)于ACL需要處理的事情。
回顧一下,我們引入ACL機(jī)制后,客戶端的代碼示例如下:
其在創(chuàng)建DefaultMQProducer時,注冊AclClientRPCHook鉤子,會在向服務(wù)端發(fā)送遠(yuǎn)程命令前后執(zhí)行其鉤子函數(shù),接下來我們重點(diǎn)分析一下AclClientRPCHook。
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); // @1 String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); // @2 request.addExtField(SIGNATURE, signature); // @3 request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } }
代碼@1:將Request請求參數(shù)進(jìn)行排序,并加入accessKey。
代碼@2:對排好序的請參數(shù),使用用戶配置的密碼生成簽名,并最近到擴(kuò)展字段Signature,然后服務(wù)端也會按照相同的算法生成Signature,如果相同,則表示簽名驗證成功(類似于實(shí)現(xiàn)登錄的效果)。
代碼@3:將Signature、AccessKey等加入到請求頭的擴(kuò)展字段中,服務(wù)端拿到這些元數(shù)據(jù),結(jié)合請求頭中的信息,根據(jù)配置的權(quán)限,進(jìn)行權(quán)限校驗。
關(guān)于ACL客戶端生成簽名是一種通用套路,就不在細(xì)講了。
“RocketMQ ACL實(shí)現(xiàn)機(jī)制是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。