溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

發(fā)布時(shí)間:2022-01-17 09:39:02 來源:億速云 閱讀:246 作者:小新 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

在 LevelDB 數(shù)據(jù)庫(kù)中高層數(shù)據(jù)下沉到低層時(shí)需要經(jīng)歷一次 Major Compaction,將高層文件的有序鍵值對(duì)和低層文件的多個(gè)有序鍵值對(duì)進(jìn)行歸并排序。磁盤多路歸并排序算法的輸入是來自多個(gè)磁盤文件的有序鍵值對(duì),在內(nèi)存中將這些文件的鍵值對(duì)進(jìn)行排序,然后輸出到一到多個(gè)新的磁盤文件中。

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

多路歸并排序在大數(shù)據(jù)領(lǐng)域也是常用的算法,常用于海量數(shù)據(jù)排序。當(dāng)數(shù)據(jù)量特別大時(shí),這些數(shù)據(jù)無法被單個(gè)機(jī)器內(nèi)存容納,它需要被切分位多個(gè)集合分別由不同的機(jī)器進(jìn)行內(nèi)存排序(map 過程),然后再進(jìn)行多路歸并算法將來自多個(gè)不同機(jī)器的數(shù)據(jù)進(jìn)行排序(reduce 過程),這是流式多路歸并排序,為什么說是流式排序呢,因?yàn)閿?shù)據(jù)源來源于網(wǎng)絡(luò)套接字。

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

多路歸并排序的優(yōu)勢(shì)在于內(nèi)存消耗極低,它的內(nèi)存占用和輸入文件的數(shù)量成正比,和數(shù)據(jù)總量無關(guān),數(shù)據(jù)總量只會(huì)線性正比影響排序的時(shí)間。

下面我們來親自實(shí)現(xiàn)一下磁盤多路歸并算法,為什么是磁盤,因?yàn)樗妮斎雭碜源疟P文件。

算法思路

我們需要在內(nèi)存里維護(hù)一個(gè)有序數(shù)組。每個(gè)輸入文件當(dāng)前最小的元素作為一個(gè)元素放在數(shù)組里。數(shù)組按照元素的大小保持排序狀態(tài)。

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

接下來我們開始進(jìn)入循環(huán),循環(huán)的邏輯總是從最小的元素下手,在其所在的文件取出下一個(gè)元素,和當(dāng)前數(shù)組中的元素進(jìn)行比較。根據(jù)比較結(jié)果進(jìn)行不同的處理,這里我們使用二分查找算法進(jìn)行快速比較。注意每個(gè)輸入文件里面的元素都是有序的。

1. 如果取出來的元素和當(dāng)前數(shù)組中的最小元素相等,那么就可以直接將這個(gè)元素輸出。再繼續(xù)下一輪循環(huán)。不可能取出比當(dāng)前數(shù)組最小元素還要小的元素,因?yàn)檩斎胛募旧硪彩怯行虻摹?/p>

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

2. 否則就需要將元素插入到當(dāng)前的數(shù)組中的指定位置,繼續(xù)保持?jǐn)?shù)組有序。然后將數(shù)組中當(dāng)前最小的元素輸出并移除。再進(jìn)行下一輪循環(huán)。

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

3. 如果遇到文件結(jié)尾,那就無法繼續(xù)調(diào)用 next() 方法了,這時(shí)可以直接將數(shù)組中的最小元素輸出并移除,數(shù)組也跟著變小了。再進(jìn)行下一輪循環(huán)。當(dāng)數(shù)組空了,說明所有的文件都處理完了,算法就可以結(jié)束了。

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序


值得注意的是,數(shù)組中永遠(yuǎn)不會(huì)存在同一個(gè)文件的兩個(gè)元素,如此才保證了數(shù)組的長(zhǎng)度不會(huì)超過輸入文件的數(shù)量,同時(shí)它也不會(huì)把沒有結(jié)尾的文件擠出數(shù)組導(dǎo)致漏排序的問題。

二分查找

需要特別注意的是Java 內(nèi)置了二分查找算法在使用上比較精巧。

public class Collections {
  ...
  public static <T> int binarySearch(List<T> list, T key) {
    ...
    if (found) {
      return index;
    } else {
      return -(insertIndex+1);
    }
  }
  ...
}


如果 key 可以在 list 中找到,那就直接返回相應(yīng)的位置。如果找不到,它會(huì)返回負(fù)數(shù),還不是簡(jiǎn)單的 -1,這個(gè)負(fù)數(shù)指明了插入的位置,也就是說在這個(gè)位置插入 key,數(shù)組將可以繼續(xù)保持有序。

比如 binarySearch 返回了 index=-1,那么 insertIndex 就是 -(index+1),也就是 0,插入點(diǎn)在數(shù)組開頭。如果返回了 index=-size-1,那么 insertIndex 就是 size,是數(shù)組末尾。其它負(fù)數(shù)會(huì)插入數(shù)組中間。

在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序

輸入文件類

對(duì)于每一個(gè)輸入文件都會(huì)創(chuàng)建一個(gè) MergeSource 對(duì)象,它提供了 hasNext() 和 next() 方法用于判斷和獲取下一個(gè)元素。注意輸入文件是有序的,下一個(gè)元素就是當(dāng)前輸入文件最小的元素。
hasNext() 方法負(fù)責(zé)讀取下一行并緩存在 cachedLine 變量中,調(diào)用 next() 方法將 cachedLine 變量轉(zhuǎn)換成整數(shù)并返回。

class MergeSource implements Closeable {
    private BufferedReader reader;
    private String cachedLine;
    private String filename;

    public MergeSource(String filename) {
        this.filename = filename;
        try {
            FileReader fr = new FileReader(filename);
            this.reader = new BufferedReader(fr);
        } catch (FileNotFoundException e) {
        }
    }

    public boolean hasNext() {
        String line;
        try {
            line = this.reader.readLine();
            if (line == null || line.isEmpty()) {
                return false;
            }
            this.cachedLine = line.trim();
            return true;
        } catch (IOException e) {
        }
        return false;
    }

    public int next() {
        if (this.cachedLine == null) {
            if (!hasNext()) {
                throw new IllegalStateException("no content");
            }
        }
        int num = Integer.parseInt(this.cachedLine);
        this.cachedLine = null;
        return num;
    }

    @Override
    public void close() throws IOException {
        this.reader.close();
    }
}

內(nèi)存有序數(shù)組元素類

在排序前先把這個(gè)數(shù)組準(zhǔn)備好,將每個(gè)輸入文件的最小元素放入數(shù)組,并排序。

class Bin implements Comparable<Bin> {
    int num;
    MergeSource source;

    Bin(MergeSource source, int num) {
        this.source = source;
        this.num = num;
    }

    @Override
    public int compareTo(Bin o) {
        return this.num - o.num;
    }

}

List<Bin> prepare() {
      List<Bin> bins = new ArrayList<>();
    for (MergeSource source : sources) {
        Bin newBin = new Bin(source, source.next());
        bins.add(newBin);
    }
    Collections.sort(bins);
    return bins;
}

輸出文件類

關(guān)閉輸出文件時(shí)注意要先 flush(),避免丟失 PrintWriter 中緩沖的內(nèi)容。

class MergeOut implements Closeable {
    private PrintWriter writer;

    public MergeOut(String filename) {
        try {
            FileOutputStream out = new FileOutputStream(filename);
            this.writer = new PrintWriter(out);
        } catch (FileNotFoundException e) {
        }
    }

    public void write(Bin bin) {
        writer.println(bin.num);
    }

    @Override
    public void close() throws IOException {
        writer.flush();
        writer.close();
    }
}

準(zhǔn)備輸入文件的內(nèi)容

下面我們來生成一系列輸入文件,每個(gè)輸入文件中包含一堆隨機(jī)整數(shù)。一共生成 n 個(gè)文件,每個(gè)文件的整數(shù)數(shù)量在 minEntries 到 minEntries 之間。返回所有輸入文件的文件名列表。

List<String> generateFiles(int n, int minEntries, int maxEntries) {
    List<String> files = new ArrayList<>();
    for (int i = 0; i < n; i++) {
        String filename = "input-" + i + ".txt";
        PrintWriter writer;
        try {
            writer = new PrintWriter(new FileOutputStream(filename));
            ThreadLocalRandom rand = ThreadLocalRandom.current();
            int entries = rand.nextInt(minEntries, maxEntries);
            List<Integer> nums = new ArrayList<>();
            for (int k = 0; k < entries; k++) {
                int num = rand.nextInt(10000000);
                nums.add(num);
            }
            Collections.sort(nums);
            for (int num : nums) {
                writer.println(num);
            }
            writer.flush();
            writer.close();
        } catch (FileNotFoundException e) {
        }
        files.add(filename);
    }
    return files;
}

排序算法

萬(wàn)事俱備,只欠東風(fēng)。將上面的類都準(zhǔn)備好之后,排序算法很簡(jiǎn)單,代碼量非常少。對(duì)照上面算法思路來理解下面的算法就很容易了。

public void sort() {
    List<Bin> bins = prepare();
    while (true) {
        // 取數(shù)組中最小的元素
        MergeSource current = bins.get(0).source;
        if (current.hasNext()) {
            // 從輸入文件中取出下一個(gè)元素
            Bin newBin = new Bin(current, current.next());
            // 二分查找,也就是和數(shù)組中已有元素進(jìn)行比較
            int index = Collections.binarySearch(bins, newBin);
            if (index == 0) {
                // 算法思路情況1
                this.out.write(newBin);
            } else {
                // 算法思路情況2
                if (index < 0) {
                    index = -(index+1);
                }
                bins.add(index, newBin);
                Bin minBin = bins.remove(0);
                this.out.write(minBin);
            }
        } else {
            // 算法思路情況3:遇到文件尾
            Bin minBin = bins.remove(0);
            this.out.write(minBin);
            if (bins.isEmpty()) {
                break;
            }
        }
    }
}

全部代碼

讀者可以直接將下面的代碼拷貝粘貼到 IDE 中運(yùn)行。

package leetcode;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class DiskMergeSort implements Closeable {

    public static List<String> generateFiles(int n, int minEntries, int maxEntries) {
        List<String> files = new ArrayList<>();
        for (int i = 0; i < n; i++) {
            String filename = "input-" + i + ".txt";
            PrintWriter writer;
            try {
                writer = new PrintWriter(new FileOutputStream(filename));
                int entries = ThreadLocalRandom.current().nextInt(minEntries, maxEntries);
                List<Integer> nums = new ArrayList<>();
                for (int k = 0; k < entries; k++) {
                    int num = ThreadLocalRandom.current().nextInt(10000000);
                    nums.add(num);
                }
                Collections.sort(nums);
                for (int num : nums) {
                    writer.println(num);
                }
                writer.close();
            } catch (FileNotFoundException e) {
            }
            files.add(filename);
        }
        return files;
    }

    private List<MergeSource> sources;
    private MergeOut out;

    public DiskMergeSort(List<String> files, String outFilename) {
        this.sources = new ArrayList<>();
        for (String filename : files) {
            this.sources.add(new MergeSource(filename));
        }
        this.out = new MergeOut(outFilename);
    }

    static class MergeOut implements Closeable {
        private PrintWriter writer;

        public MergeOut(String filename) {
            try {
                this.writer = new PrintWriter(new FileOutputStream(filename));
            } catch (FileNotFoundException e) {
            }
        }

        public void write(Bin bin) {
            writer.println(bin.num);
        }

        @Override
        public void close() throws IOException {
            writer.flush();
            writer.close();
        }
    }

    static class MergeSource implements Closeable {
        private BufferedReader reader;
        private String cachedLine;

        public MergeSource(String filename) {
            try {
                FileReader fr = new FileReader(filename);
                this.reader = new BufferedReader(fr);
            } catch (FileNotFoundException e) {
            }
        }

        public boolean hasNext() {
            String line;
            try {
                line = this.reader.readLine();
                if (line == null || line.isEmpty()) {
                    return false;
                }
                this.cachedLine = line.trim();
                return true;
            } catch (IOException e) {
            }
            return false;
        }

        public int next() {
            if (this.cachedLine == null) {
                if (!hasNext()) {
                    throw new IllegalStateException("no content");
                }
            }
            int num = Integer.parseInt(this.cachedLine);
            this.cachedLine = null;
            return num;
        }

        @Override
        public void close() throws IOException {
            this.reader.close();
        }
    }

    static class Bin implements Comparable<Bin> {
        int num;
        MergeSource source;

        Bin(MergeSource source, int num) {
            this.source = source;
            this.num = num;
        }

        @Override
        public int compareTo(Bin o) {
            return this.num - o.num;
        }
    }

    public List<Bin> prepare() {
        List<Bin> bins = new ArrayList<>();
        for (MergeSource source : sources) {
            Bin newBin = new Bin(source, source.next());
            bins.add(newBin);
        }
        Collections.sort(bins);
        return bins;
    }

    public void sort() {
        List<Bin> bins = prepare();
        while (true) {
            MergeSource current = bins.get(0).source;
            if (current.hasNext()) {
                Bin newBin = new Bin(current, current.next());
                int index = Collections.binarySearch(bins, newBin);
                if (index == 0 || index == -1) {
                    this.out.write(newBin);
                    if (index == -1) {
                        throw new IllegalStateException("impossible");
                    }
                } else {
                    if (index < 0) {
                        index = -index - 1;
                    }
                    bins.add(index, newBin);
                    Bin minBin = bins.remove(0);
                    this.out.write(minBin);
                }
            } else {
                Bin minBin = bins.remove(0);
                this.out.write(minBin);
                if (bins.isEmpty()) {
                    break;
                }
            }
        }
    }

    @Override
    public void close() throws IOException {
        for (MergeSource source : sources) {
            source.close();
        }
        this.out.close();
    }

    public static void main(String[] args) throws IOException {
        List<String> inputs = DiskMergeSort.generateFiles(100, 10000, 20000);
        // 運(yùn)行多次看算法耗時(shí)
        for (int i = 0; i < 20; i++) {
            DiskMergeSort sorter = new DiskMergeSort(inputs, "output.txt");
            long start = System.currentTimeMillis();
            sorter.sort();
            long duration = System.currentTimeMillis() - start;
            System.out.printf("%dms\n", duration);
            sorter.close();
        }
    }
}


本算法還有一個(gè)小缺陷,那就是如果輸入文件數(shù)量非常多,那么內(nèi)存中的數(shù)組就會(huì)特別大,對(duì)數(shù)組的插入刪除操作肯定會(huì)很耗時(shí),這時(shí)可以考慮使用 TreeSet 來代替數(shù)組,讀者們可以自行嘗試一下。

關(guān)于“在LevelDB數(shù)據(jù)庫(kù)中如何實(shí)現(xiàn)磁盤多路歸并排序”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI