溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Android多線程斷點續(xù)傳下載原理及實現(xiàn)

發(fā)布時間:2020-08-06 15:32:27 來源:網絡 閱讀:294 作者:Android丶VG 欄目:移動開發(fā)

這段時間看了看工作室的工具庫的下載組件,發(fā)現(xiàn)其存在一些問題:

1.下載核心邏輯有 bug,在暫停下載或下載失敗等情況時有概率無法順利完成下載。
2.雖然原來的設計是采用多線程斷點續(xù)傳的設計,但打了一下日志發(fā)現(xiàn)其實下載任務都是在同一個線程下串行執(zhí)行,并沒有起到加快下載速度的作用。

考慮到原來的代碼并不復雜,因此對這部分下載組件進行了重寫。這里記錄一下里面的多線程斷點續(xù)傳功能的實現(xiàn)。
Android多線程斷點續(xù)傳下載原理及實現(xiàn)

請查看完整的PDF版
(更多完整項目下載。未完待續(xù)。源碼。圖文知識后續(xù)上傳github。)
可以點擊關于我聯(lián)系我獲取完整PDF
(VX:mm14525201314)

多線程下載意義

首先我們談一談,多線程下載的意義。

在日常的場景下,網絡中不可能只有下載方與服務器之間這樣一條連接,為了避免在這樣的場景下的網絡擁塞,TCP 協(xié)議通過調節(jié)窗口的大小來避免出現(xiàn)擁塞,但這個窗口的大小可能沒辦法達到我們預期的效果:充分利用我們的帶寬。因此我們可以采用多個 TCP 連接的形式來提高我們帶寬的利用率,從而加快下載速度。

打個比喻就是我們要從一個水缸中用抽水機通過水管抽水,由于管子的直徑等等的限制,我們單條管子無法完全利用我們的抽水機的抽水動力。因此我們就將這些抽水的任務分成了多份,分攤到多個管子上,這樣就可以更充分的利用我們的抽水機動力,從而提高抽水的速度。

因此,我們使用多線程下載的主要意義就是——提高下載速度。

多線程下載原理

任務分配

前面提到了我們主要的目的是將一個總的下載任務分攤到多個子任務中,比如假設我們用 5 個線程下載這個文件,那么我們就可以對一個長度為 N 的任務進行如下圖的均分:
Android多線程斷點續(xù)傳下載原理及實現(xiàn)
但真實場景下往往 N 都不是剛好為 5 的倍數的,因此對于最后一個任務還需要加上剩余的任務量,也就是 N/5+N%5。

Http Range 請求頭

上面的任務分配我們已經了解了,看起來很理想,但有一個問題,我們如何實現(xiàn)向服務器只請求這個文件的某一段而不是全部呢?

我們可以通過在請求頭中加入 Range 字段來指定請求的范圍,從而實現(xiàn)指定某一段的數據。

如:RANGE bytes=10000-19999 就指定了 10000-19999 這段字節(jié)的數據

所以我們的核心思想就是通過它拿到文件對應字節(jié)段的 InputStream,然后對它讀取并寫入文件。

RandomAccessFile 文件寫入

下面再講講文件寫入問題,由于我們是多線程下載,因此文件并不是每次都是從前往后一個個字節(jié)寫入的,隨時可能在文件的任何一個地方寫入數據。因此我們需要能夠在文件的指定位置寫入數據。這里我們用到了RandomAccessFile 來實現(xiàn)這個功能。

RandomAccessFile 是一個隨機訪問文件類,同時整合了 FileOutputStreamFileInputStream,支持從文件的任何字節(jié)處讀寫數據。通過它我們就可以在文件的任何字節(jié)處寫入數據。

接下來簡單講講我們這里是如何使用 RandomAccessFile 的。我們對于每個子任務來說都有一個開始和結束的位置。每個任務都可以通過 RandomAccessFile::seek 跳轉到文件的對應字節(jié)位置,然后從該位置開始讀取 InputStream 并寫入。

這樣,就實現(xiàn)了不同線程對文件的隨機寫入。

文件大小的獲取

由于我們在真正開始下載之前,我們需要先將任務分配到各個線程,因此我們需要先了解到文件的大小。

為了獲取到文件的大小,我們用到 Response Headers 中的 Content-Length 字段。

如下圖所示,可以看到,打開該下載請求的鏈接后,Response Headers 中包含了我們需要的 Content-Length,也就是該文件的大小,單位是字節(jié)。
Android多線程斷點續(xù)傳下載原理及實現(xiàn)

斷點續(xù)傳原理

對于多個子任務,我們如何實現(xiàn)它們的斷點續(xù)傳呢?

其實原理很簡單,只需要保證每個子任務的下載進度能夠被即時地記錄即可。這樣繼續(xù)下載時只需要讀取這些下載記錄,從上次下載結束的位置開始下載即可。

它的實現(xiàn)有很多方式,只要能做到數據持久化即可。這里我使用的是數據庫來實現(xiàn)。

這樣,我們的子任務需要擁有一些必要的信息

  • completedSize:當前下載完成大小
  • taskSize:子任務總大小
  • startPos:子任務開始位置
  • currentPos:子任務進行到的位置
  • endPos:子任務結束位置

通過這些信息,我們就能夠記錄子任務的下載進度從而恢復我們之前的下載,實現(xiàn)斷點續(xù)傳。

代碼實現(xiàn)

下面我們用代碼來實現(xiàn)這樣一個多線程下載功能。

下載狀態(tài)

首先,我們定義一下下載中的各個狀態(tài):

public class DownloadStatus {
    public static final int IDLE = 233;                    // 空閑,默認狀態(tài)
    public static final int COMPLETED = 234;        // 完成
    public static final int DOWNLOADING = 235;    // 下載中
    public static final int PAUSE = 236;                // 暫停
    public static final int ERROR = 237;                // 出錯
}

可以看到,這里定義了如上的五種狀態(tài)。

基本輔助類的抽象

這里需要用到如數據庫及 HTTP 請求的功能,我們這里定義其接口如下,具體實現(xiàn)各位可以根據需要自己實現(xiàn):

數據庫輔助類
public interface DownloadDbHelper {
    /**
     * 從數據庫中刪除子任務記錄
     * @param task 子任務記錄
     */
    void delete(SubDownloadTask task);

    /**
     * 向數據庫中插入子任務記錄
     * @param task 子任務記錄
     */
    void insert(SubDownloadTask task);

    /**
     * 在數據庫中更新子任務記錄
     * @param task 子任務記錄
     */
    void update(SubDownloadTask task);

    /**
     * 獲取所有指定Task下的子任務記錄
     * @param taskTag Task的Tag
     * @return 子任務記錄
     */
    List<SubDownloadTask> queryByTaskTag(String taskTag);
}
Http 輔助類
public interface DownloadHttpHelper {

    /**
     * 獲取文件總長度
     * @param url 下載url
     * @param callback 獲取文件長度CallBack
     */
    void getTotalSize(String url, NetCallback<Long> callback);

    /**
     * 獲取InputStream
     * @param url 下載url
     * @param start 開始位置
     * @param end 結束位置
     * @param callback 獲取字節(jié)流的CallBack
     */
    void getStreamByRange(String url, long start, long end, NetCallback<InputStream> callback);
}
子任務實現(xiàn)
成員變量及解釋

我們先從上到下,從子任務開始實現(xiàn)。在我的設計中,它具有如下的成員變量:

@Entity
public class SubDownloadTask implements Runnable {
    public static final int BUFFER_SIZE = 1024 * 1024;
    private static final String TAG = SubDownloadTask.class.getSimpleName();

    @Id
    private Long id;
    private String url;                                            // 文件下載的 url
    private String taskTag;                                    // 父任務的 Tag
    private long taskSize;                                    // 子任務大小
    private long completedSize;                            // 子任務完成大小
    private long startPos;                                    // 開始位置
    private long currentPos;                                // 當前位置
    private long endPos;                                        // 結束位置
    private volatile int status;                        // 當前下載狀態(tài)
    @Transient
    private SubDownloadListener listener;        // 子任務下載監(jiān)聽,主要用于提示父任務
    @Transient
    private File saveFile;                                    // 要保存到的文件

    ...
}

由于這里的數據庫的操作是用 GreenDao 實現(xiàn),因此這里有一些相關注解,各位可以忽略。

InputStream 獲取

可以看到,子任務是一個 Runnable,我們可以通過其 run 方法開始下載,這樣就可以通過如 ExecutorService 來開啟多個線程執(zhí)行子任務。

我們看到其 run 方法:

@Override
public void run() {
    status = DownloadStatus.DOWNLOADING;
    DownloadManager.getInstance()
            .getHttpHelper()
            .getStreamByRange(url, currentPos, endPos, new NetCallback<InputStream>() {
                @Override
                public void onResult(InputStream inputStream) {
                    listener.onSubStart();
                    writeFile(inputStream);
                }
                @Override
                public void onError(String message) {
                    listener.onSubError("文件流獲取失敗");
                    status = DownloadStatus.ERROR;
                }
            });
}

可以看到,我們獲取了其從 currentPosendPos 端的字節(jié)流,通過其 Response Body 拿到了它的 InputStream,然后調用了 writeFile(InputStream) 方法進行文件的寫入。

文件寫入
接下來看到 writeFile 方法:

private void writeFile(InputStream in) {
    try {
        RandomAccessFile file = new RandomAccessFile(saveFile, "rwd");    // 通過 saveFile 建立RandomAccessFile
        file.seek(currentPos);    // 跳轉到對應位置

                byte[] buffer = new byte[BUFFER_SIZE];
        while (true) {
                // 循環(huán)讀取 InputStream,直到暫?;蜃x取結束
            if (status != DownloadStatus.DOWNLOADING) {
                    // 狀態(tài)不為 DOWNLOADING,停止下載
                break;
            }

            int offset = in.read(buffer, 0, BUFFER_SIZE);
            if (offset == -1) {
                    // 讀取不到數據,說明讀取結束
                break;
            }

                        // 將讀取到的數據寫入文件
            file.write(buffer, 0, offset);
            // 下載數據并在數據庫中更新
            currentPos += offset;
            completedSize += offset;
            DownloadManager.getInstance()
                .getDbHelper()
                .update(this);
            // 通知父任務下載進度
            listener.onSubDownloading(offset);
        }
        if(status == DownloadStatus.DOWNLOADING) {
            // 下載完成
            status = DownloadStatus.COMPLETED;
            // 通知父任務下載完成
            listener.onSubComplete(completedSize);
        }
        file.close();
        in.close();
    } catch (IOException e) {
        e.printStackTrace();
        listener.onSubError("文件下載失敗");
        status = DownloadStatus.ERROR;
        resetTask();
    }
}

具體流程可以看代碼中的注釋??梢钥吹?,子任務實際上就是循環(huán)讀取 InputStream,并寫入文件,同時將下載進度同步到數據庫。

父任務實現(xiàn)

父任務也就是我們具體的下載任務,我們同樣先看到成員變量:

public class DownloadTask implements SubDownloadListener {
    private static final String TAG = DownloadTask.class.getSimpleName();
    private String tag;                                                // 下載任務的 Tag,用于區(qū)分不同下載任務
    private String url;                                                // 下載 url
    private String savePath;                                    // 保存路徑
    private String fileName;                                    // 保存文件名
    private DownloadListener listener;                // 下載監(jiān)聽
    private long completeSize;                                // 下載完成大小
    private long totalSize;                                        // 下載任務總大小
    private int status;                                                // 當前下載進度
    private int threadNum;                                        // 線程數(由外部設置的每個任務的下載線程數)
    private File file;                                                // 保存文件
    private List<SubDownloadTask> subTasks;        // 子任務列表
    private ExecutorService mExecutorService;    // 線程池,用于執(zhí)行子任務

    ...
}
下載功能

對于一個下載任務,可以通過 download 方法開始執(zhí)行:

public void download() {
    listener.onStart();
    subTasks = querySubTasks();
    status = DownloadStatus.DOWNLOADING;
    if (subTasks.isEmpty()) {
        // 是新任務
        downloadNewTask();
    } else if (subTasks.size() == threadNum) {
        // 不是新任務
        downloadExistTask();
    } else {
        // 不是新任務,但下載線程數有誤
        listener.onError("斷點數據有誤");
        resetTask();
    }
}

可以看到,我們先將子任務列表從數據庫中讀取出來。

  • 如果子任務列表為空,則說明還沒有下載記錄,也就是說是一個新任務,調用 downloadNewTask 方法。
  • 如果子任務列表大小等于線程數,則說明其不是新任務,調用 downloadExistTask 方法。
  • 如果子任務列表大小不等于線程數,說明當前的下載記錄已不可用,于是重置下載任務,從新下載。

    下載新任務

    我們先看到 downloadNewTask 方法:

    DownloadManager.getInstance()
        .getHttpHelper()
        .getTotalSize(url, new NetCallback<Long>() {
            @Override
            public void onResult(Long total) {
                completeSize = 0L;
                totalSize = total;
                initSubTasks();
                startAsyncDownload();
            }
    
            @Override
            public void onError(String message) {
                error("獲取文件長度失敗");
            }
        });

    可以看到,獲取到總長度后,通過調用 initSubTasks 方法,對子任務列表進行了初始化(計算子任務長度等),然后調用了 startAsyncDownload 方法后通過 ExecutorService 運行子任務進入子任務進行下載。

我們看到 initSubTasks 方法:

private void initSubTasks() {
    long averageSize = totalSize / threadNum;
    for (int taskIndex = 0; taskIndex < threadNum; taskIndex++) {
        long taskSize = averageSize;
        if (taskIndex == threadNum - 1) {
            // 最后一個任務,則 size 還需要加入剩余量
            taskSize += totalSize % threadNum;
        }
        long start = 0L;
        int index = taskIndex;
        while (index > 0) {
            start += subTasks.get(index - 1).getTaskSize();
            index--;
        }
        long end = start + taskSize - 1;        // 注意這里
        SubDownloadTask subTask = new SubDownloadTask();
        subTask.setUrl(url);
        subTask.setStatus(DownloadStatus.IDLE);
        subTask.setTaskTag(tag);
        subTask.setCompletedSize(0);
        subTask.setTaskSize(taskSize);
        subTask.setStartPos(start);
        subTask.setCurrentPos(start);
        subTask.setEndPos(end);
        subTask.setSaveFile(file);
        subTask.setListener(this);
        DownloadManager.getInstance()
                .getDbHelper()
                .insert(subTask);
        subTasks.add(subTask);
    }
}

可以看到就是計算每個任務的大小及開始及結束點的位置,這里要注意的是 endPos 需要 -1,否則各個任務的下載位置會重疊,并且最后一個任務會多下載一個字節(jié)導致如文件損壞等影響。具體原因就是比如一個大小為 500 的文件,則應當是 0-499 而不是 0-500。

恢復舊任務

接下來我們看看 downloadExistTask 方法:

private void downloadExistTask() {
    // 不是新任務,且下載線程數無誤,計算已下載大小
    completeSize = countCompleteSize();
    totalSize = countTotalSize();
    startAsyncDownload();
}

這里其實很簡單,遍歷子任務列表計算已下載量及總任務量,并調用 startAsyncDownload 開始多線程下載。

執(zhí)行子任務

具體執(zhí)行子任務我們可以看到 startAsyncDownload 方法:

private void startAsyncDownload() {
    for (SubDownloadTask subTask : subTasks) {
        if (subTask.getCompletedSize() < subTask.getTaskSize()) {
            // 只下載沒有下載結束的子任務
            mExecutorService.execute(subTask);
        }
    }
}

可以看到,這里其實只是通過 ExecutorService 執(zhí)行對應子任務(Runnable)而已。

####暫停功能
我們接下來看到 pause 方法:

public void pause() {
    stopAsyncDownload();
    status = DownloadStatus.PAUSE;
    listener.onPause();
}

可以看到,這里只是調用了 stopAsyncDownload 方法停止子任務。

看到 stopAsyncDownload 方法:

private void stopAsyncDownload() {
    for (SubDownloadTask subTask : subTasks) {
        if (subTask.getStatus() != DownloadStatus.COMPLETED) {
            // 下載完成的不再取消
            subTask.cancel();
        }
    }
}

可以看到,調用了子任務的 cancel 方法。

繼續(xù)看到子任務的 cancel方法:

void cancel() {
    status = DownloadStatus.PAUSE;
    listener.onSubCancel();
}

這里很簡單,僅僅是將下載狀態(tài)設置為了 PAUSE,這樣在寫入文件的下一次 while 循環(huán)時便會中止循環(huán)從而結束 Runnable 的執(zhí)行。

取消功能

看到 cancel方法:

public void cancel() {
    stopAsyncDownload();
    resetTask();
    listener.onCancel();
}

可以看到和暫停的邏輯差不多,只是在暫停后還需要對子任務重置從而使得下次下載從頭開始。

底層到上層的通知機制

前面提到,外部可以通過 DownloadListener 監(jiān)聽下載的進度,下面是 DownloadListener接口的定義:

public interface DownloadListener {
    default void onStart() {}

    default void onDownloading(long progress, long total) {}

    default void onPause() {}

    default void onCancel() {}

    default void onComplete() {}

    default void onError(String message) {}
}

我們實時的下載進度其實是在子任務的保存文件過程中才能體現(xiàn)出來的,同樣,子任務的下載失敗也需要通知到 DownloadListener,這是怎么做到的呢?

前面提到了,我們還定義了一個 SubDownloadListener,其監(jiān)聽者就是子任務的父任務。通過監(jiān)聽我們可以將子任務狀態(tài)反饋到父任務,父任務再根據具體情況反饋數據給 DownloadListener。

public interface SubDownloadListener {
    void onSubStart();

    void onSubDownloading(int offset);

    void onSubCancel();

    void onSubComplete(long completeSize);

    void onSubError(String message);
}

比如之前看到,每次下載失敗我們都會調用 onSubError,每次讀取 offset 的數據都會調用 onSubDownload(offset),每個任務下載失敗都會調用 onSubComplete(completeSize)。這樣,我們子任務的下載狀態(tài)就成功返回給了上層。

我們接著看看上層是如何處理的:

 @Override
    public void onSubStart() {}

    @Override
    public void onSubDownloading(int offset) {
        synchronized (this) {
            completeSize = completeSize + offset;
            listener.onDownloading(completeSize, totalSize);
        }
    }

    @Override
    public void onSubCancel() {}

    @Override
    public void onSubComplete(long completeSize) {
        checkComplete();
    }

    @Override
    public void onSubError(String message) {
        error(message);
    }

可以看到,每次下載到一段數據,它都會把數據量返回上來,此時 completeSize 就加上了對應的 offset,然后再將新的 completeSize 通知給監(jiān)聽者,這樣就實現(xiàn)了下載進度的監(jiān)聽。這里之所以加鎖是因為會有多個線程(子任務線程)對 completeSize 進行操作,加鎖保證線程安全。

而每次有子任務完成,它都會調用 checkComplete 方法檢查是否下載完成,若每個子任務都下載完成,則說明任務下載完成,然后通知監(jiān)聽者。

同樣的,每次子任務出現(xiàn)錯誤,都會通知監(jiān)聽者出現(xiàn)錯誤,并做一些錯誤情況下的處理。

到這里,這篇文章就結束了,我們成功實現(xiàn)了多線程斷點續(xù)傳下載功能?;谶@個原理,我們可以做一些上層的封裝實現(xiàn)一個文件下載框架。

請查看完整的PDF版
(更多完整項目下載。未完待續(xù)。源碼。圖文知識后續(xù)上傳github。)
可以點擊關于我聯(lián)系我獲取完整PDF
(VX:mm14525201314)

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI