您好,登錄后才能下訂單哦!
這篇文章主要介紹RocketMQ設(shè)計之同步刷盤的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
在同步刷盤模式下,當消息寫到內(nèi)存后,會等待數(shù)據(jù)寫到磁盤的CommitLog
文件。
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } } class GroupCommitService extends FlushCommitLogService { private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); //提交刷盤任務(wù)到任務(wù)列表 public synchronized void putRequest(final GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } } private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } @Override protected void onWaitEnd() { this.swapRequests(); } @Override public String getServiceName() { return GroupCommitService.class.getSimpleName(); } @Override public long getJointime() { return 1000 * 60 * 5; } }
GroupCommitRequest
是刷盤任務(wù),提交刷盤任務(wù)后,會在刷盤隊列中等待刷盤,而刷盤線程
GroupCommitService
每隔10毫秒寫一批數(shù)據(jù)到磁盤。之所以不直接寫是磁盤io壓力大,寫入性能低,每隔10毫秒寫一次可以提升磁盤io效率和寫入性能。
putRequest(request) 提交刷盤任務(wù)到任務(wù)列表
request.waitForFlush同步等待GroupCommitService
將任務(wù)列表中的任務(wù)刷盤完成。
兩個隊列讀寫分離,requestsWrite
是寫隊列,用戶保存添加進來的刷盤任務(wù),requestsRead
是讀隊列,在刷盤之前會把寫隊列的數(shù)據(jù)放入讀隊列。
CommitLog的doCommit方法:
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { //根據(jù)offset確定是否已經(jīng)刷盤 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } } req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空已刷盤的列表 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
刷盤的時候依次讀取requestsRead
中的數(shù)據(jù)寫入磁盤,
寫入完成后清空requestsRead
。
讀寫分離設(shè)計的目的是在刷盤時不影響任務(wù)提交到列表。
CommitLog.this.mappedFileQueue.flush(0);是刷盤操作:
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
通過MappedFile映射的CommitLog文件寫入磁盤
這就是RocketMQ高可用設(shè)計之同步刷盤的基本情況了,大體思路就是一個讀寫分離的隊列來刷盤,同步刷盤任務(wù)提交后會在刷盤隊列中等待刷盤完成后再返回,而GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。
以上是“RocketMQ設(shè)計之同步刷盤的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。