溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python如何編寫下載器

發(fā)布時(shí)間:2021-08-07 13:48:57 來源:億速云 閱讀:184 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要為大家展示了“Python如何編寫下載器”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Python如何編寫下載器”這篇文章吧。

具體內(nèi)容如下

#!/bin/python3 
# author: lidawei 
# create: 2016-07-11 
# version: 1.0 
# 功能說明: 
#  從指定的URL將文件取回本地 
##################################################### 
 
import http.client 
import os 
import threading 
import time 
import logging 
import unittest 
from queue import Queue 
from urllib.parse import urlparse 
 
logging.basicConfig(level = logging.DEBUG, 
     format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 
     datefmt = '%a, %d %b %Y %H:%M:%S', 
     filename = 'Downloader_%s.log' % (time.strftime('%Y-%m-%d')), 
     filemode = 'a') 
 
class Downloader(object): 
 '''''文件下載器''' 
 url = '' 
 filename = '' 
 
 def __init__(self, full_url_str, filename): 
  '''''初始化''' 
  self.url = urlparse(full_url_str) 
  self.filename = filename 
 
 def download(self): 
  '''''執(zhí)行下載,返回True或False''' 
  if self.url == '' or self.url == None or self.filename == '' or self.filename == None: 
   logging.error('Invalid parameter for Downloader') 
   return False 
 
  successed = False 
  conn = None 
  if self.url.scheme == 'https': 
   conn = http.client.HTTPSConnection(self.url.netloc) 
  else: 
   conn = http.client.HTTPConnection(self.url.netloc) 
  conn.request('GET', self.url.path) 
  response = conn.getresponse() 
  if response.status == 200: 
   total_size = response.getheader('Content-Length') 
   total_size = (int)(total_size) 
   if total_size > 0: 
    finished_size = 0 
    file = open(self.filename, 'wb') 
    if file: 
     progress = Progress() 
     progress.start() 
     while not response.closed: 
      buffers = response.read(1024) 
      file.write(buffers) 
 
      finished_size += len(buffers) 
      progress.update(finished_size, total_size) 
      if finished_size >= total_size: 
       break 
     # ... end while statment 
     file.close() 
     progress.stop() 
     progress.join() 
    else: 
     logging.error('Create local file %s failed' % (self.filename)) 
    # ... end if statment 
   else: 
    logging.error('Request file %s size failed' % (self.filename)) 
   # ... end if statment 
  else: 
   logging.error('HTTP/HTTPS request failed, status code:%d' % (response.status)) 
  # ... end if statment 
  conn.close() 
 
  return successed 
 # ... end download() method 
# ... end Downloader class 
 
class DataWriter(threading.Thread): 
 filename = '' 
 data_dict = {'offset' : 0, 'buffers_byte' : b''} 
 queue = Queue(128) 
 __stop = False 
 
 def __init__(self, filename): 
  self.filename = filename 
  threading.Thread.__init__(self) 
 
 #Override 
 def run(self): 
  while not self.__stop: 
   self.queue.get(True, 1) 
 
 def put_data(data_dict): 
  '''''將data_dict的數(shù)據(jù)放入隊(duì)列,data_dict是一個(gè)字典,有兩個(gè)元素:offset是偏移量,buffers_byte是二進(jìn)制字節(jié)串''' 
  self.queue.put(data_dict) 
 
 def stop(self): 
  self.__stop = True 
 
class Progress(threading.Thread): 
 interval = 1 
 total_size = 0 
 finished_size = 0 
 old_size = 0 
 __stop = False 
 
 def __init__(self, interval = 0.5): 
  self.interval = interval 
  threading.Thread.__init__(self) 
 
 #Override 
 def run(self): 
  # logging.info('  Total  Finished  Percent  Speed') 
  print('  Total  Finished  Percent  Speed') 
  while not self.__stop: 
   time.sleep(self.interval) 
   if self.total_size > 0: 
    percent = self.finished_size / self.total_size * 100 
    speed = (self.finished_size - self.old_size) / self.interval 
    msg = '%12d %12d %10.2f%% %12d' % (self.total_size, self.finished_size, percent, speed) 
    # logging.info(msg) 
    print(msg) 
 
    self.old_size = self.finished_size 
   else: 
    logging.error('Total size is zero') 
  # ... end while statment 
 # ... end run() method 
 
 def stop(self): 
  self.__stop = True 
 
 def update(self, finished_size, total_size): 
  self.finished_size = finished_size 
  self.total_size = total_size 
 
class TestDownloaderFunctions(unittest.TestCase): 
 
 def setUp(self): 
  print('setUp') 
 
 def test_download(self): 
  url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe' 
  filename = 'QQ8.4.exe' 
  dl = Downloader(url, filename) 
  dl.download() 
 
 def tearDown(self): 
  print('tearDown') 
 
if __name__ == '__main__': 
 unittest.main()

以上是“Python如何編寫下載器”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI