您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
在 LevelDB 數(shù)據(jù)庫(kù)中高層數(shù)據(jù)下沉到低層時(shí)需要經(jīng)歷一次 Major Compaction,將高層文件的有序鍵值對(duì)和低層文件的多個(gè)有序鍵值對(duì)進(jìn)行歸并排序。磁盤多路歸并排序算法的輸入是來自多個(gè)磁盤文件的有序鍵值對(duì),在內(nèi)存中將這些文件的鍵值對(duì)進(jìn)行排序,然后輸出到一到多個(gè)新的磁盤文件中。
多路歸并排序在大數(shù)據(jù)領(lǐng)域也是常用的算法,常用于海量數(shù)據(jù)排序。當(dāng)數(shù)據(jù)量特別大時(shí),這些數(shù)據(jù)無法被單個(gè)機(jī)器內(nèi)存容納,它需要被切分位多個(gè)集合分別由不同的機(jī)器進(jìn)行內(nèi)存排序(map 過程),然后再進(jìn)行多路歸并算法將來自多個(gè)不同機(jī)器的數(shù)據(jù)進(jìn)行排序(reduce 過程),這是流式多路歸并排序,為什么說是流式排序呢,因?yàn)閿?shù)據(jù)源來源于網(wǎng)絡(luò)套接字。
多路歸并排序的優(yōu)勢(shì)在于內(nèi)存消耗極低,它的內(nèi)存占用和輸入文件的數(shù)量成正比,和數(shù)據(jù)總量無關(guān),數(shù)據(jù)總量只會(huì)線性正比影響排序的時(shí)間。
下面我們來親自實(shí)現(xiàn)一下磁盤多路歸并算法,為什么是磁盤,因?yàn)樗妮斎雭碜源疟P文件。
我們需要在內(nèi)存里維護(hù)一個(gè)有序數(shù)組。每個(gè)輸入文件當(dāng)前最小的元素作為一個(gè)元素放在數(shù)組里。數(shù)組按照元素的大小保持排序狀態(tài)。
接下來我們開始進(jìn)入循環(huán),循環(huán)的邏輯總是從最小的元素下手,在其所在的文件取出下一個(gè)元素,和當(dāng)前數(shù)組中的元素進(jìn)行比較。根據(jù)比較結(jié)果進(jìn)行不同的處理,這里我們使用二分查找算法進(jìn)行快速比較。注意每個(gè)輸入文件里面的元素都是有序的。
1. 如果取出來的元素和當(dāng)前數(shù)組中的最小元素相等,那么就可以直接將這個(gè)元素輸出。再繼續(xù)下一輪循環(huán)。不可能取出比當(dāng)前數(shù)組最小元素還要小的元素,因?yàn)檩斎胛募旧硪彩怯行虻摹?/p>
2. 否則就需要將元素插入到當(dāng)前的數(shù)組中的指定位置,繼續(xù)保持?jǐn)?shù)組有序。然后將數(shù)組中當(dāng)前最小的元素輸出并移除。再進(jìn)行下一輪循環(huán)。
3. 如果遇到文件結(jié)尾,那就無法繼續(xù)調(diào)用 next() 方法了,這時(shí)可以直接將數(shù)組中的最小元素輸出并移除,數(shù)組也跟著變小了。再進(jìn)行下一輪循環(huán)。當(dāng)數(shù)組空了,說明所有的文件都處理完了,算法就可以結(jié)束了。
值得注意的是,數(shù)組中永遠(yuǎn)不會(huì)存在同一個(gè)文件的兩個(gè)元素,如此才保證了數(shù)組的長(zhǎng)度不會(huì)超過輸入文件的數(shù)量,同時(shí)它也不會(huì)把沒有結(jié)尾的文件擠出數(shù)組導(dǎo)致漏排序的問題。
需要特別注意的是Java 內(nèi)置了二分查找算法在使用上比較精巧。
public class Collections {
...
public static <T> int binarySearch(List<T> list, T key) {
...
if (found) {
return index;
} else {
return -(insertIndex+1);
}
}
...
}
如果 key 可以在 list 中找到,那就直接返回相應(yīng)的位置。如果找不到,它會(huì)返回負(fù)數(shù),還不是簡(jiǎn)單的 -1,這個(gè)負(fù)數(shù)指明了插入的位置,也就是說在這個(gè)位置插入 key,數(shù)組將可以繼續(xù)保持有序。
比如 binarySearch 返回了 index=-1,那么 insertIndex 就是 -(index+1),也就是 0,插入點(diǎn)在數(shù)組開頭。如果返回了 index=-size-1,那么 insertIndex 就是 size,是數(shù)組末尾。其它負(fù)數(shù)會(huì)插入數(shù)組中間。
對(duì)于每一個(gè)輸入文件都會(huì)創(chuàng)建一個(gè) MergeSource 對(duì)象,它提供了 hasNext() 和 next() 方法用于判斷和獲取下一個(gè)元素。注意輸入文件是有序的,下一個(gè)元素就是當(dāng)前輸入文件最小的元素。
hasNext() 方法負(fù)責(zé)讀取下一行并緩存在 cachedLine 變量中,調(diào)用 next() 方法將 cachedLine 變量轉(zhuǎn)換成整數(shù)并返回。
class MergeSource implements Closeable {
private BufferedReader reader;
private String cachedLine;
private String filename;
public MergeSource(String filename) {
this.filename = filename;
try {
FileReader fr = new FileReader(filename);
this.reader = new BufferedReader(fr);
} catch (FileNotFoundException e) {
}
}
public boolean hasNext() {
String line;
try {
line = this.reader.readLine();
if (line == null || line.isEmpty()) {
return false;
}
this.cachedLine = line.trim();
return true;
} catch (IOException e) {
}
return false;
}
public int next() {
if (this.cachedLine == null) {
if (!hasNext()) {
throw new IllegalStateException("no content");
}
}
int num = Integer.parseInt(this.cachedLine);
this.cachedLine = null;
return num;
}
@Override
public void close() throws IOException {
this.reader.close();
}
}
在排序前先把這個(gè)數(shù)組準(zhǔn)備好,將每個(gè)輸入文件的最小元素放入數(shù)組,并排序。
class Bin implements Comparable<Bin> {
int num;
MergeSource source;
Bin(MergeSource source, int num) {
this.source = source;
this.num = num;
}
@Override
public int compareTo(Bin o) {
return this.num - o.num;
}
}
List<Bin> prepare() {
List<Bin> bins = new ArrayList<>();
for (MergeSource source : sources) {
Bin newBin = new Bin(source, source.next());
bins.add(newBin);
}
Collections.sort(bins);
return bins;
}
關(guān)閉輸出文件時(shí)注意要先 flush(),避免丟失 PrintWriter 中緩沖的內(nèi)容。
class MergeOut implements Closeable {
private PrintWriter writer;
public MergeOut(String filename) {
try {
FileOutputStream out = new FileOutputStream(filename);
this.writer = new PrintWriter(out);
} catch (FileNotFoundException e) {
}
}
public void write(Bin bin) {
writer.println(bin.num);
}
@Override
public void close() throws IOException {
writer.flush();
writer.close();
}
}
下面我們來生成一系列輸入文件,每個(gè)輸入文件中包含一堆隨機(jī)整數(shù)。一共生成 n 個(gè)文件,每個(gè)文件的整數(shù)數(shù)量在 minEntries 到 minEntries 之間。返回所有輸入文件的文件名列表。
List<String> generateFiles(int n, int minEntries, int maxEntries) {
List<String> files = new ArrayList<>();
for (int i = 0; i < n; i++) {
String filename = "input-" + i + ".txt";
PrintWriter writer;
try {
writer = new PrintWriter(new FileOutputStream(filename));
ThreadLocalRandom rand = ThreadLocalRandom.current();
int entries = rand.nextInt(minEntries, maxEntries);
List<Integer> nums = new ArrayList<>();
for (int k = 0; k < entries; k++) {
int num = rand.nextInt(10000000);
nums.add(num);
}
Collections.sort(nums);
for (int num : nums) {
writer.println(num);
}
writer.flush();
writer.close();
} catch (FileNotFoundException e) {
}
files.add(filename);
}
return files;
}
萬(wàn)事俱備,只欠東風(fēng)。將上面的類都準(zhǔn)備好之后,排序算法很簡(jiǎn)單,代碼量非常少。對(duì)照上面算法思路來理解下面的算法就很容易了。
public void sort() {
List<Bin> bins = prepare();
while (true) {
// 取數(shù)組中最小的元素
MergeSource current = bins.get(0).source;
if (current.hasNext()) {
// 從輸入文件中取出下一個(gè)元素
Bin newBin = new Bin(current, current.next());
// 二分查找,也就是和數(shù)組中已有元素進(jìn)行比較
int index = Collections.binarySearch(bins, newBin);
if (index == 0) {
// 算法思路情況1
this.out.write(newBin);
} else {
// 算法思路情況2
if (index < 0) {
index = -(index+1);
}
bins.add(index, newBin);
Bin minBin = bins.remove(0);
this.out.write(minBin);
}
} else {
// 算法思路情況3:遇到文件尾
Bin minBin = bins.remove(0);
this.out.write(minBin);
if (bins.isEmpty()) {
break;
}
}
}
}
讀者可以直接將下面的代碼拷貝粘貼到 IDE 中運(yùn)行。
package leetcode;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class DiskMergeSort implements Closeable {
public static List<String> generateFiles(int n, int minEntries, int maxEntries) {
List<String> files = new ArrayList<>();
for (int i = 0; i < n; i++) {
String filename = "input-" + i + ".txt";
PrintWriter writer;
try {
writer = new PrintWriter(new FileOutputStream(filename));
int entries = ThreadLocalRandom.current().nextInt(minEntries, maxEntries);
List<Integer> nums = new ArrayList<>();
for (int k = 0; k < entries; k++) {
int num = ThreadLocalRandom.current().nextInt(10000000);
nums.add(num);
}
Collections.sort(nums);
for (int num : nums) {
writer.println(num);
}
writer.close();
} catch (FileNotFoundException e) {
}
files.add(filename);
}
return files;
}
private List<MergeSource> sources;
private MergeOut out;
public DiskMergeSort(List<String> files, String outFilename) {
this.sources = new ArrayList<>();
for (String filename : files) {
this.sources.add(new MergeSource(filename));
}
this.out = new MergeOut(outFilename);
}
static class MergeOut implements Closeable {
private PrintWriter writer;
public MergeOut(String filename) {
try {
this.writer = new PrintWriter(new FileOutputStream(filename));
} catch (FileNotFoundException e) {
}
}
public void write(Bin bin) {
writer.println(bin.num);
}
@Override
public void close() throws IOException {
writer.flush();
writer.close();
}
}
static class MergeSource implements Closeable {
private BufferedReader reader;
private String cachedLine;
public MergeSource(String filename) {
try {
FileReader fr = new FileReader(filename);
this.reader = new BufferedReader(fr);
} catch (FileNotFoundException e) {
}
}
public boolean hasNext() {
String line;
try {
line = this.reader.readLine();
if (line == null || line.isEmpty()) {
return false;
}
this.cachedLine = line.trim();
return true;
} catch (IOException e) {
}
return false;
}
public int next() {
if (this.cachedLine == null) {
if (!hasNext()) {
throw new IllegalStateException("no content");
}
}
int num = Integer.parseInt(this.cachedLine);
this.cachedLine = null;
return num;
}
@Override
public void close() throws IOException {
this.reader.close();
}
}
static class Bin implements Comparable<Bin> {
int num;
MergeSource source;
Bin(MergeSource source, int num) {
this.source = source;
this.num = num;
}
@Override
public int compareTo(Bin o) {
return this.num - o.num;
}
}
public List<Bin> prepare() {
List<Bin> bins = new ArrayList<>();
for (MergeSource source : sources) {
Bin newBin = new Bin(source, source.next());
bins.add(newBin);
}
Collections.sort(bins);
return bins;
}
public void sort() {
List<Bin> bins = prepare();
while (true) {
MergeSource current = bins.get(0).source;
if (current.hasNext()) {
Bin newBin = new Bin(current, current.next());
int index = Collections.binarySearch(bins, newBin);
if (index == 0 || index == -1) {
this.out.write(newBin);
if (index == -1) {
throw new IllegalStateException("impossible");
}
} else {
if (index < 0) {
index = -index - 1;
}
bins.add(index, newBin);
Bin minBin = bins.remove(0);
this.out.write(minBin);
}
} else {
Bin minBin = bins.remove(0);
this.out.write(minBin);
if (bins.isEmpty()) {
break;
}
}
}
}
@Override
public void close() throws IOException {
for (MergeSource source : sources) {
source.close();
}
this.out.close();
}
public static void main(String[] args) throws IOException {
List<String> inputs = DiskMergeSort.generateFiles(100, 10000, 20000);
// 運(yùn)行多次看算法耗時(shí)
for (int i = 0; i < 20; i++) {
DiskMergeSort sorter = new DiskMergeSort(inputs, "output.txt");
long start = System.currentTimeMillis();
sorter.sort();
long duration = System.currentTimeMillis() - start;
System.out.printf("%dms\n", duration);
sorter.close();
}
}
}
本算法還有一個(gè)小缺陷,那就是如果輸入文件數(shù)量非常多,那么內(nèi)存中的數(shù)組就會(huì)特別大,對(duì)數(shù)組的插入刪除操作肯定會(huì)很耗時(shí),這時(shí)可以考慮使用 TreeSet 來代替數(shù)組,讀者們可以自行嘗試一下。
關(guān)于“在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。