您好,登錄后才能下訂單哦!
前言
簡(jiǎn)單學(xué)習(xí)過(guò)網(wǎng)絡(luò)爬蟲(chóng),只是之前都是照著書上做并發(fā),大概能理解,卻還是無(wú)法自己用到自己項(xiàng)目中,這里自己研究實(shí)現(xiàn)一個(gè)網(wǎng)頁(yè)嗅探HTML5播放控件中基于m3u8協(xié)議ts格式視頻資源的項(xiàng)目,并未考慮過(guò)復(fù)雜情況,畢竟只是練練手.
源碼
# coding=utf-8 import asyncio import multiprocessing import os import re import time from math import floor from multiprocessing import Manager import aiohttp import requests from lxml import html import threading from src.my_lib import retry from src.my_lib import time_statistics class M3U8Download: _path = "./resource\\" # 本地文件路徑 _url_seed = None # 資源所在鏈接前綴 _target_url = {} # 資源任務(wù)目標(biāo)字典 _mode = "" _headers = {"User-agent": "Mozilla/5.0"} # 瀏覽器代理 _target_num = 100 def __init__(self): self._ml = Manager().list() # 進(jìn)程通信列表 if not os.path.exists(self._path): # 檢測(cè)本地目錄存在否 os.makedirs(self._path) exec_str = r'chcp 65001' os.system(exec_str) # 先切換utf-8輸出,防止控制臺(tái)亂碼 def sniffing(self, url): self._url = url print("開(kāi)始嗅探...") try: r = requests.get(self._url) # 訪問(wèn)嗅探網(wǎng)址,獲取網(wǎng)頁(yè)信息 except: print("嗅探失敗,網(wǎng)址不正確") os.system("pause") else: tree = html.fromstring(r.content) try: source_url = tree.xpath('//video//source/@src')[0] # 嗅探資源控制文件鏈接,這里只針對(duì)一個(gè)資源控制文件 # self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 從資源控制文件鏈接解析域名 except: print("嗅探失敗,未發(fā)現(xiàn)資源") os.system("pause") else: self.analysis(source_url) def analysis(self, source_url): try: self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 從資源控制文件鏈接解析域名 with requests.get(source_url) as r: # 訪問(wèn)資源控制文件,獲得資源信息 src = re.split("\n*#.+\n", r.text) # 解析資源信息 for sub_src in src: # 將資源地址儲(chǔ)存到任務(wù)字典 if sub_src: self._target_url[sub_src] = self._url_seed + "/" + sub_src except Exception as e: print("資源無(wú)法成功解析", e) os.system("pause") else: self._target_num = len(self._target_url) print("sniffing success!!!,found", self._target_num, "url.") self._mode = input( "1:-> 單進(jìn)程(Low B)\n2:-> 多進(jìn)程+多線程(網(wǎng)速開(kāi)始biubiu飛起!)\n3:-> 多進(jìn)程+協(xié)程(最先進(jìn)的并發(fā)!!!)\n") if self._mode == "1": for path, url in self._target_url.items(): self._download(path, url) elif self._mode == "2" or self._mode == "3": self._multiprocessing() def _multiprocessing(self, processing_num=4): # 多進(jìn)程,多線程 target_list = {} # 進(jìn)程任務(wù)字典,儲(chǔ)存每個(gè)進(jìn)程分配的任務(wù) pool = multiprocessing.Pool(processes=processing_num) # 開(kāi)啟進(jìn)程池 i = 0 # 任務(wù)分配標(biāo)識(shí) for path, url in self._target_url.items(): # 分配進(jìn)程任務(wù) target_list[path] = url i += 1 if i % 10 == 0 or i == len(self._target_url): # 每個(gè)進(jìn)程分配十個(gè)任務(wù) if self._mode == "2": pool.apply_async(self._sub_multithreading, kwds=target_list) # 使用多線程驅(qū)動(dòng)方法 else: pool.apply_async(self._sub_coroutine, kwds=target_list) # 使用協(xié)程驅(qū)動(dòng)方法 target_list = {} pool.close() # join函數(shù)等待所有子進(jìn)程結(jié)束 pool.join() # 調(diào)用join之前,先調(diào)用close函數(shù),否則會(huì)出錯(cuò)。執(zhí)行完close后不會(huì)有新的進(jìn)程加入到pool while True: if self._judge_over(): self._combine() break def _sub_multithreading(self, **kwargs): for path, url in kwargs.items(): # 根據(jù)進(jìn)程任務(wù)開(kāi)啟線程 t = threading.Thread(target=self._download, args=(path, url,)) t.start() @retry() def _download(self, path, url): # 同步下載方法 with requests.get(url, headers=self._headers) as r: if r.status_code == 200: with open(self._path + path, "wb")as file: file.write(r.content) self._ml.append(0) # 每成功一個(gè)就往進(jìn)程通信列表增加一個(gè)值 percent = '%.2f' % (len(self._ml) / self._target_num * 100) print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 顯示下載進(jìn)度 else: print(path, r.status_code, r.reason) def _sub_coroutine(self, **kwargs): tasks = [] for path, url in kwargs.items(): # 根據(jù)進(jìn)程任務(wù)創(chuàng)建協(xié)程任務(wù)列表 tasks.append(asyncio.ensure_future(self._async_download(path, url))) loop = asyncio.get_event_loop() # 創(chuàng)建異步事件循環(huán) loop.run_until_complete(asyncio.wait(tasks)) # 注冊(cè)任務(wù)列表 async def _async_download(self, path, url): # 異步下載方法 async with aiohttp.ClientSession() as session: async with session.get(url, headers=self._headers) as resp: try: assert resp.status == 200, "E" # 斷言狀態(tài)碼為200,否則拋異常,觸發(fā)重試裝飾器 with open(self._path + path, "wb")as file: file.write(await resp.read()) except Exception as e: print(e) else: self._ml.append(0) # 每成功一個(gè)就往進(jìn)程通信列表增加一個(gè)值 percent = '%.2f' % (len(self._ml) / self._target_num * 100) print(len(self._ml), ": ", path, "->OK", "\tcomplete:", percent, "%") # 顯示下載進(jìn)度 def _combine(self): # 組合資源方法 try: print("開(kāi)始組合資源...") identification = str(floor(time.time())) exec_str = r'copy /b "' + self._path + r'*.ts" "' + self._path + 'video' + identification + '.mp4"' os.system(exec_str) # 使用cmd命令將資源整合 exec_str = r'del "' + self._path + r'*.ts"' os.system(exec_str) # 刪除原來(lái)的文件 except: print("資源組合失敗") else: print("資源組合成功!") def _judge_over(self): # 判斷是否全部下載完成 if len(self._ml) == len(self._target_url): return True return False @time_statistics def app(): multiprocessing.freeze_support() url = input("輸入嗅探網(wǎng)址:\n") m3u8 = M3U8Download() m3u8.sniffing(url) # m3u8.analysis(url) if __name__ == "__main__": app()
這里是兩個(gè)裝飾器的實(shí)現(xiàn):
import time def time_statistics(fun): def function_timer(*args, **kwargs): t0 = time.time() result = fun(*args, **kwargs) t1 = time.time() print("Total time running %s: %s seconds" % (fun.__name__, str(t1 - t0))) return result return function_timer def retry(retries=3): def _retry(fun): def wrapper(*args, **kwargs): for _ in range(retries): try: return fun(*args, **kwargs) except Exception as e: print("@", fun.__name__, "->", e) return wrapper return _retry
打包成exe文件
使用PyInstaller -F download.py將程序打包成單個(gè)可執(zhí)行文件.
這里需要注意一下,因?yàn)槌绦蚝卸噙M(jìn)程,需要在執(zhí)行前加一句multiprocessing.freeze_support(),不然程序會(huì)反復(fù)執(zhí)行多進(jìn)程前的功能.
關(guān)于協(xié)程
協(xié)程在Python3.5進(jìn)化到了async await版本,用 async 標(biāo)記異步方法,在異步方法里對(duì)耗時(shí)操作使用await標(biāo)記.這里使用了一個(gè)進(jìn)程驅(qū)動(dòng)協(xié)程的方法,在進(jìn)程池創(chuàng)建多個(gè)協(xié)程任務(wù),使用asyncio.get_event_loop()創(chuàng)建協(xié)程事件循環(huán),使用run_until_complete()注冊(cè)協(xié)程任務(wù),asyncio.wait()方法接收一個(gè)任務(wù)列表進(jìn)行協(xié)程注冊(cè).
關(guān)于裝飾器
裝飾器源于閉包原理,這里使用了兩種裝飾器.
關(guān)于CMD控制臺(tái)
程序會(huì)使用CMD命令來(lái)將下載的ts文件合并.
因?yàn)镃MD默認(rèn)使用GB2312編碼,調(diào)用os.system()需要先切換成通用的UTF-8輸出,否則系統(tǒng)信息會(huì)亂碼.
而且使用cmd命令時(shí)參數(shù)最好加雙引號(hào),以避免特殊符號(hào)報(bào)錯(cuò).
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。