溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Python3爬蟲如何維護(hù)代理池

發(fā)布時(shí)間:2020-07-31 09:03:47 來源:億速云 閱讀:200 作者:小豬 欄目:開發(fā)技術(shù)

這篇文章主要為大家展示了Python3爬蟲如何維護(hù)代理池,內(nèi)容簡(jiǎn)而易懂,希望大家可以學(xué)習(xí)一下,學(xué)習(xí)完之后肯定會(huì)有收獲的,下面讓小編帶大家一起來看看吧。

1. 準(zhǔn)備工作

要實(shí)現(xiàn)代理池我們首先需要成功安裝好了 Redis 數(shù)據(jù)庫并啟動(dòng)服務(wù),另外還需要安裝 Aiohttp、Requests、RedisPy、PyQuery、Flask 庫,如果沒有安裝可以參考第一章的安裝說明。

2. 代理池的目標(biāo)

代理池要做到易用、高效,我們一般需要做到下面的幾個(gè)目標(biāo):

基本模塊分為四塊,獲取模塊、存儲(chǔ)模塊、檢查模塊、接口模塊。

獲取模塊需要定時(shí)去各大代理網(wǎng)站抓取代理,代理可以是免費(fèi)公開代理也可以是付費(fèi)代理,代理的形式都是 IP 加端口,盡量從不同來源獲取,盡量抓取高匿代理,抓取完之后將可用代理保存到數(shù)據(jù)庫中。

存儲(chǔ)模塊負(fù)責(zé)存儲(chǔ)抓取下來的代理。首先我們需要保證代理不重復(fù),另外我們還需要標(biāo)識(shí)代理的可用情況,而且需要?jiǎng)討B(tài)實(shí)時(shí)處理每個(gè)代理,所以說,一種比較高效和方便的存儲(chǔ)方式就是使用 Redis 的 Sorted Set,也就是有序集合。

檢測(cè)模塊需要定時(shí)將數(shù)據(jù)庫中的代理進(jìn)行檢測(cè),在這里我們需要設(shè)置一個(gè)檢測(cè)鏈接,最好是爬取哪個(gè)網(wǎng)站就檢測(cè)哪個(gè)網(wǎng)站,這樣更加有針對(duì)性,如果要做一個(gè)通用型的代理,那可以設(shè)置百度等鏈接來檢測(cè)。另外我們需要標(biāo)識(shí)每一個(gè)代理的狀態(tài),如設(shè)置分?jǐn)?shù)標(biāo)識(shí),100 分代表可用,分?jǐn)?shù)越少代表越不可用,檢測(cè)一次如果可用,我們可以將其立即設(shè)置為100 滿分,也可以在原基礎(chǔ)上加 1 分,當(dāng)不可用,可以將其減 1 分,當(dāng)減到一定閾值后就直接從數(shù)據(jù)庫移除。通過這樣的標(biāo)識(shí)分?jǐn)?shù),我們就可以區(qū)分出代理的可用情況,選用的時(shí)候會(huì)更有針對(duì)性。

接口模塊需要用 API 來提供對(duì)外服務(wù)的接口,其實(shí)我們可以直接連數(shù)據(jù)庫來取,但是這樣就需要知道數(shù)據(jù)庫的連接信息,不太安全,而且需要配置連接,所以一個(gè)比較安全和方便的方式就是提供一個(gè) Web API 接口,通過訪問接口即可拿到可用代理。另外由于可用代理可能有多個(gè),我們可以提供隨機(jī)返回一個(gè)可用代理的接口,這樣保證每個(gè)可用代理都可以取到,實(shí)現(xiàn)負(fù)載均衡。

以上便是設(shè)計(jì)代理的一些基本思路,那么接下來我們就設(shè)計(jì)一下整體的架構(gòu),然后用代碼該實(shí)現(xiàn)代理池。

3. 代理池的架構(gòu)

根據(jù)上文的描述,代理池的架構(gòu)可以是這樣的,如圖 9-1 所示:

Python3爬蟲如何維護(hù)代理池

圖 9-1 代理池架構(gòu)

代理池分為四個(gè)部分,獲取模塊、存儲(chǔ)模塊、檢測(cè)模塊、接口模塊。

存儲(chǔ)模塊使用Redis的有序集合,用以代理的去重和狀態(tài)標(biāo)識(shí),同時(shí)它也是中心模塊和基礎(chǔ)模塊,將其他模塊串聯(lián)起來。

獲取模塊定時(shí)從代理網(wǎng)站獲取代理,將獲取的代理傳遞給存儲(chǔ)模塊,保存到數(shù)據(jù)庫。

檢測(cè)模塊定時(shí)通過存儲(chǔ)模塊獲取所有代理,并對(duì)其進(jìn)行檢測(cè),根據(jù)不同的檢測(cè)結(jié)果對(duì)代理設(shè)置不同的標(biāo)識(shí)。

接口模塊通過 Web API 提供服務(wù)接口,其內(nèi)部還是連接存儲(chǔ)模塊,獲取可用的代理。

4. 代理池的實(shí)現(xiàn)

接下來我們分別用代碼來實(shí)現(xiàn)一下這四個(gè)模塊。

存儲(chǔ)模塊

存儲(chǔ)在這里我們使用 Redis 的有序集合,集合的每一個(gè)元素都是不重復(fù)的,對(duì)于代理代理池來說,集合的元素就變成了一個(gè)個(gè)代理,也就是 IP 加端口的形式,如 60.207.237.111:8888,這樣的一個(gè)代理就是集合的一個(gè)元素。另外有序集合的每一個(gè)元素還都有一個(gè)分?jǐn)?shù)字段,分?jǐn)?shù)是可以重復(fù)的,是一個(gè)浮點(diǎn)數(shù)類型,也可以是整數(shù)類型。該集合會(huì)根據(jù)每一個(gè)元素的分?jǐn)?shù)對(duì)集合進(jìn)行排序,數(shù)值小的排在前面,數(shù)值大的排在后面,這樣就可以實(shí)現(xiàn)集合元素的排序了。

對(duì)于代理池來說,這個(gè)分?jǐn)?shù)可以作為我們判斷一個(gè)代理可用不可用的標(biāo)志,我們將 100 設(shè)為最高分,代表可用,0 設(shè)為最低分,代表不可用。從代理池中獲取代理的時(shí)候會(huì)隨機(jī)獲取分?jǐn)?shù)最高的代理,注意這里是隨機(jī),這樣可以保證每個(gè)可用代理都會(huì)被調(diào)用到。

分?jǐn)?shù)是我們判斷代理穩(wěn)定性的重要標(biāo)準(zhǔn),在這里我們?cè)O(shè)置分?jǐn)?shù)規(guī)則如下:

分?jǐn)?shù) 100 為可用,檢測(cè)器會(huì)定時(shí)循環(huán)檢測(cè)每個(gè)代理可用情況,一旦檢測(cè)到有可用的代理就立即置為 100,檢測(cè)到不可用就將分?jǐn)?shù)減 1,減至 0 后移除。

新獲取的代理添加時(shí)將分?jǐn)?shù)置為 10,當(dāng)測(cè)試可行立即置 100,不可行分?jǐn)?shù)減 1,減至 0 后移除。

這是一種解決方案,當(dāng)然可能還有更合理的方案。此方案的設(shè)置有一定的原因,在此總結(jié)如下:

當(dāng)檢測(cè)到代理可用時(shí)立即置為 100,這樣可以保證所有可用代理有更大的機(jī)會(huì)被獲取到。你可能會(huì)說為什么不直接將分?jǐn)?shù)加 1 而是直接設(shè)為最高 100 呢?設(shè)想一下,我們有的代理是從各大免費(fèi)公開代理網(wǎng)站獲取的,如果一個(gè)代理并沒有那么穩(wěn)定,平均五次請(qǐng)求有兩次成功,三次失敗,如果按照這種方式來設(shè)置分?jǐn)?shù),那么這個(gè)代理幾乎不可能達(dá)到一個(gè)高的分?jǐn)?shù),也就是說它有時(shí)是可用的,但是我們篩選是篩選的分?jǐn)?shù)最高的,所以這樣的代理就幾乎不可能被取到,當(dāng)然如果想追求代理穩(wěn)定性的化可以用這種方法,這樣可確保分?jǐn)?shù)最高的一定是最穩(wěn)定可用的。但是在這里我們采取可用即設(shè)置 100 的方法,確保只要可用的代理都可以被使用到。

當(dāng)檢測(cè)到代理不可用時(shí),將分?jǐn)?shù)減 1,減至 0 后移除,一共 100 次機(jī)會(huì),也就是說當(dāng)一個(gè)可用代理接下來如果嘗試了 100 次都失敗了,就一直減分直到移除,一旦成功就重新置回 100,嘗試機(jī)會(huì)越多代表將這個(gè)代理拯救回來的機(jī)會(huì)越多,這樣不容易將曾經(jīng)的一個(gè)可用代理丟棄,因?yàn)榇聿豢捎玫脑蚩赡苁蔷W(wǎng)絡(luò)繁忙或者其他人用此代理請(qǐng)求太過頻繁,所以在這里設(shè)置為 100 級(jí)。

新獲取的代理分?jǐn)?shù)設(shè)置為 10,檢測(cè)如果不可用就減 1,減到 0 就移除,如果可用就置 100。由于我們很多代理是從免費(fèi)網(wǎng)站獲取的,所以新獲取的代理無效的可能性是非常高的,可能不足 10%,所以在這里我們將其設(shè)置為 10,檢測(cè)的機(jī)會(huì)沒有可用代理 100 次那么多,這也可以適當(dāng)減少開銷。

以上便是代理分?jǐn)?shù)的一個(gè)設(shè)置思路,不一定是最優(yōu)思路,但個(gè)人實(shí)測(cè)實(shí)用性還是比較強(qiáng)的。

所以我們就需要定義一個(gè)類來操作數(shù)據(jù)庫的有序集合,定義一些方法來實(shí)現(xiàn)分?jǐn)?shù)的設(shè)置,代理的獲取等等。

實(shí)現(xiàn)如下:

MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'
 
import redis
from random import choice
 
class RedisClient(object):
    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
        """
        初始化
        :param host: Redis 地址
        :param port: Redis 端口
        :param password: Redis密碼
        """
        self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
 
    def add(self, proxy, score=INITIAL_SCORE):
        """
        添加代理,設(shè)置分?jǐn)?shù)為最高
        :param proxy: 代理
        :param score: 分?jǐn)?shù)
        :return: 添加結(jié)果
        """
        if not self.db.zscore(REDIS_KEY, proxy):
            return self.db.zadd(REDIS_KEY, score, proxy)
 
    def random(self):
        """
        隨機(jī)獲取有效代理,首先嘗試獲取最高分?jǐn)?shù)代理,如果不存在,按照排名獲取,否則異常
        :return: 隨機(jī)代理
        """
        result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
        if len(result):
            return choice(result)
        else:
            result = self.db.zrevrange(REDIS_KEY, 0, 100)
            if len(result):
                return choice(result)
            else:
                raise PoolEmptyError
 
    def decrease(self, proxy):
        """
        代理值減一分,小于最小值則刪除
        :param proxy: 代理
        :return: 修改后的代理分?jǐn)?shù)
        """
        score = self.db.zscore(REDIS_KEY, proxy)
        if score and score > MIN_SCORE:
            print('代理', proxy, '當(dāng)前分?jǐn)?shù)', score, '減1')
            return self.db.zincrby(REDIS_KEY, proxy, -1)
        else:
            print('代理', proxy, '當(dāng)前分?jǐn)?shù)', score, '移除')
            return self.db.zrem(REDIS_KEY, proxy)
 
    def exists(self, proxy):
        """
        判斷是否存在
        :param proxy: 代理
        :return: 是否存在
        """
        return not self.db.zscore(REDIS_KEY, proxy) == None
 
    def max(self, proxy):
        """
        將代理設(shè)置為MAX_SCORE
        :param proxy: 代理
        :return: 設(shè)置結(jié)果
        """
        print('代理', proxy, '可用,設(shè)置為', MAX_SCORE)
        return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
 
    def count(self):
        """
        獲取數(shù)量
        :return: 數(shù)量
        """
        return self.db.zcard(REDIS_KEY)
 
    def all(self):
        """
        獲取全部代理
        :return: 全部代理列表
        """
        return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

首先定義了一些常量,如 MAX_SCORE、MIN_SCORE、INITIAL_SCORE 分別代表最大分?jǐn)?shù)、最小分?jǐn)?shù)、初始分?jǐn)?shù)。REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 分別代表了 Redis 的連接信息,即地址、端口、密碼。REDIS_KEY 是有序集合的鍵名,可以通過它來獲取代理存儲(chǔ)所使用的有序集合。

接下來定義了一個(gè) RedisClient 類,用以操作 Redis 的有序集合,其中定義了一些方法來對(duì)集合中的元素進(jìn)行處理,主要功能如下:

init() 方法是初始化的方法,參數(shù)是Redis的連接信息,默認(rèn)的連接信息已經(jīng)定義為常量,在 init() 方法中初始化了一個(gè) StrictRedis 的類,建立 Redis 連接。這樣當(dāng) RedisClient 類初始化的時(shí)候就建立了Redis的連接。

add() 方法向數(shù)據(jù)庫添加代理并設(shè)置分?jǐn)?shù),默認(rèn)的分?jǐn)?shù)是 INITIAL_SCORE 也就是 10,返回結(jié)果是添加的結(jié)果。

random() 方法是隨機(jī)獲取代理的方法,首先獲取 100 分的代理,然后隨機(jī)選擇一個(gè)返回,如果不存在 100 分的代理,則按照排名來獲取,選取前 100 名,然后隨機(jī)選擇一個(gè)返回,否則拋出異常。

decrease() 方法是在代理檢測(cè)無效的時(shí)候設(shè)置分?jǐn)?shù)減 1 的方法,傳入代理,然后將此代理的分?jǐn)?shù)減 1,如果達(dá)到最低值,那么就刪除。

exists() 方法判斷代理是否存在集合中

max() 方法是將代理的分?jǐn)?shù)設(shè)置為 MAX_SCORE,即 100,也就是當(dāng)代理有效時(shí)的設(shè)置。

count() 方法返回當(dāng)前集合的元素個(gè)數(shù)。

all() 方法返回所有的代理列表,供檢測(cè)使用。

定義好了這些方法,我們可以在后續(xù)的模塊中調(diào)用此類來連接和操作數(shù)據(jù)庫,非常方便。如我們想要獲取隨機(jī)可用的代理,只需要調(diào)用 random() 方法即可,得到的就是隨機(jī)的可用代理。

獲取模塊

獲取模塊的邏輯相對(duì)簡(jiǎn)單,首先需要定義一個(gè) Crawler 來從各大網(wǎng)站抓取代理,示例如下:

import json
from .utils import get_page
from pyquery import PyQuery as pq
 
class ProxyMetaclass(type):
    def __new__(cls, name, bases, attrs):
        count = 0
        attrs['__CrawlFunc__'] = []
        for k, v in attrs.items():
            if 'crawl_' in k:
                attrs['__CrawlFunc__'].append(k)
                count += 1
        attrs['__CrawlFuncCount__'] = count
        return type.__new__(cls, name, bases, attrs)
 
class Crawler(object, metaclass=ProxyMetaclass):
    def get_proxies(self, callback):
        proxies = []
        for proxy in eval("self.{}()".format(callback)):
            print('成功獲取到代理', proxy)
            proxies.append(proxy)
        return proxies
 
    def crawl_daili66(self, page_count=4):
        """
        獲取代理66
        :param page_count: 頁碼
        :return: 代理
        """
        start_url = 'http://www.66ip.cn/{}.html'
        urls = [start_url.format(page) for page in range(1, page_count + 1)]
        for url in urls:
            print('Crawling', url)
            html = get_page(url)
            if html:
                doc = pq(html)
                trs = doc('.containerbox table tr:gt(0)').items()
                for tr in trs:
                    ip = tr.find('td:nth-child(1)').text()
                    port = tr.find('td:nth-child(2)').text()
                    yield ':'.join([ip, port])
 
    def crawl_proxy360(self):
        """
        獲取Proxy360
        :return: 代理
        """
        start_url = 'http://www.proxy#/Region/China'
        print('Crawling', start_url)
        html = get_page(start_url)
        if html:
            doc = pq(html)
            lines = doc('div[name="list_proxy_ip"]').items()
            for line in lines:
                ip = line.find('.tbBottomLine:nth-child(1)').text()
                port = line.find('.tbBottomLine:nth-child(2)').text()
                yield ':'.join([ip, port])
 
    def crawl_goubanjia(self):
        """
        獲取Goubanjia
        :return: 代理
        """
        start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
        html = get_page(start_url)
        if html:
            doc = pq(html)
            tds = doc('td.ip').items()
            for td in tds:
                td.find('p').remove()
                yield td.text().replace(' ', '')

為了實(shí)現(xiàn)靈活,在這里我們將獲取代理的一個(gè)個(gè)方法統(tǒng)一定義一個(gè)規(guī)范,如統(tǒng)一定義以 crawl 開頭,這樣擴(kuò)展的時(shí)候只需要添加 crawl 開頭的方法即可。

在這里實(shí)現(xiàn)了幾個(gè)示例,如抓取代理 66、Proxy360、Goubanjia 三個(gè)免費(fèi)代理網(wǎng)站,這些方法都定義成了生成器,通過 yield 返回一個(gè)個(gè)代理。首先將網(wǎng)頁獲取,然后用PyQuery 解析,解析出IP加端口的形式的代理然后返回。

然后定義了一個(gè) get_proxies() 方法,將所有以 crawl 開頭的方法調(diào)用一遍,獲取每個(gè)方法返回的代理并組合成列表形式返回。

你可能會(huì)想知道是怎樣獲取了所有以 crawl 開頭的方法名稱的。其實(shí)這里借助于元類來實(shí)現(xiàn),定義了一個(gè) ProxyMetaclass,Crawl 類將它設(shè)置為元類,元類中實(shí)現(xiàn)了 new() 方法,這個(gè)方法有固定的幾個(gè)參數(shù),其中第四個(gè)參數(shù) attrs 中包含了類的一些屬性,這其中就包含了類中方法的一些信息,我們可以遍歷 attrs 這個(gè)變量即可獲取類的所有方法信息。所以在這里我們?cè)?new() 方法中遍歷了 attrs 的這個(gè)屬性,就像遍歷一個(gè)字典一樣,鍵名對(duì)應(yīng)的就是方法的名稱,接下來判斷其開頭是否是 crawl,如果是,則將其加入到 CrawlFunc 屬性中,這樣我們就成功將所有以 crawl 開頭的方法定義成了一個(gè)屬性,就成功動(dòng)態(tài)地獲取到所有以 crawl 開頭的方法列表了。

所以說,如果要做擴(kuò)展的話,我們只需要添加一個(gè)以 crawl開頭的方法,例如抓取快代理,我們只需要在 Crawler 類中增加 crawl_kuaidaili() 方法,仿照其他的幾個(gè)方法將其定義成生成器,抓取其網(wǎng)站的代理,然后通過 yield 返回代理即可,所以這樣我們可以非常方便地?cái)U(kuò)展,而不用關(guān)心類其他部分的實(shí)現(xiàn)邏輯。

代理網(wǎng)站的添加非常靈活,不僅可以添加免費(fèi)代理,也可以添加付費(fèi)代理,一些付費(fèi)代理的提取方式其實(shí)也類似,也是通過 Web 的形式獲取,然后進(jìn)行解析,解析方式可能更加簡(jiǎn)單,如解析純文本或 Json,解析之后以同樣的方式返回即可,在此不再添加,可以自行擴(kuò)展。

既然定義了這個(gè) Crawler 類,我們就要調(diào)用啊,所以在這里再定義一個(gè) Getter 類,動(dòng)態(tài)地調(diào)用所有以 crawl 開頭的方法,然后獲取抓取到的代理,將其加入到數(shù)據(jù)庫存儲(chǔ)起來。

from db import RedisClient
from crawler import Crawler
 
POOL_UPPER_THRESHOLD = 10000
 
class Getter():
    def __init__(self):
        self.redis = RedisClient()
        self.crawler = Crawler()
 
    def is_over_threshold(self):
        """
        判斷是否達(dá)到了代理池限制
        """
        if self.redis.count() >= POOL_UPPER_THRESHOLD:
            return True
        else:
            return False
 
    def run(self):
        print('獲取器開始執(zhí)行')
        if not self.is_over_threshold():
            for callback_label in range(self.crawler.__CrawlFuncCount__):
                callback = self.crawler.__CrawlFunc__[callback_label]
                proxies = self.crawler.get_proxies(callback)
                for proxy in proxies:
                    self.redis.add(proxy)

Getter 類就是獲取器類,這其中定義了一個(gè)變量 POOL_UPPER_THRESHOLD 表示代理池的最大數(shù)量,這個(gè)數(shù)量可以靈活配置,然后定義了 is_over_threshold() 方法判斷代理池是否已經(jīng)達(dá)到了容量閾值,它就是調(diào)用了 RedisClient 的 count() 方法獲取代理的數(shù)量,然后加以判斷,如果數(shù)量達(dá)到閾值則返回 True,否則 False。如果不想加這個(gè)限制可以將此方法永久返回 True。

接下來定義了 run() 方法,首先判斷了代理池是否達(dá)到閾值,然后在這里就調(diào)用了 Crawler 類的 CrawlFunc 屬性,獲取到所有以 crawl 開頭的方法列表,依次通過 get_proxies() 方法調(diào)用,得到各個(gè)方法抓取到的代理,然后再利用 RedisClient 的 add() 方法加入數(shù)據(jù)庫,這樣獲取模塊的工作就完成了。

檢測(cè)模塊

在獲取模塊中,我們已經(jīng)成功將各個(gè)網(wǎng)站的代理獲取下來了,然后就需要一個(gè)檢測(cè)模塊來對(duì)所有的代理進(jìn)行一輪輪的檢測(cè),檢測(cè)可用就設(shè)置為 100,不可用就分?jǐn)?shù)減 1,這樣就可以實(shí)時(shí)改變每個(gè)代理的可用情況,在獲取有效代理的時(shí)候只需要獲取分?jǐn)?shù)高的代理即可。

由于代理的數(shù)量非常多,為了提高代理的檢測(cè)效率,我們?cè)谶@里使用異步請(qǐng)求庫 Aiohttp 來進(jìn)行檢測(cè)。

Requests 作為一個(gè)同步請(qǐng)求庫,我們?cè)诎l(fā)出一個(gè)請(qǐng)求之后需要等待網(wǎng)頁加載完成之后才能繼續(xù)執(zhí)行程序。也就是這個(gè)過程會(huì)阻塞在等待響應(yīng)這個(gè)過程,如果服務(wù)器響應(yīng)非常慢,比如一個(gè)請(qǐng)求等待十幾秒,那么我們使用 Requests 完成一個(gè)請(qǐng)求就會(huì)需要十幾秒的時(shí)間,中間其實(shí)就是一個(gè)等待響應(yīng)的過程,程序也不會(huì)繼續(xù)往下執(zhí)行,而這十幾秒的時(shí)間其實(shí)完全可以去做其他的事情,比如調(diào)度其他的請(qǐng)求或者進(jìn)行網(wǎng)頁解析等等。

異步請(qǐng)求庫就解決了這個(gè)問題,它類似 JavaScript 中的回調(diào),意思是說在請(qǐng)求發(fā)出之后,程序可以繼續(xù)接下去執(zhí)行去做其他的事情,當(dāng)響應(yīng)到達(dá)時(shí),會(huì)通知程序再去處理這個(gè)響應(yīng),這樣程序就沒有被阻塞,充分把時(shí)間和資源利用起來,大大提高效率。

對(duì)于響應(yīng)速度比較快的網(wǎng)站,可能 Requests 同步請(qǐng)求和 Aiohttp 異步請(qǐng)求的效果差距沒那么大,可對(duì)于檢測(cè)代理這種事情,一般是需要十多秒甚至幾十秒的時(shí)間,這時(shí)候使用 Aiohttp 異步請(qǐng)求庫的優(yōu)勢(shì)就大大體現(xiàn)出來了,效率可能會(huì)提高幾十倍不止。

所以在這里我們的代理檢測(cè)使用異步請(qǐng)求庫 Aiohttp,實(shí)現(xiàn)示例如下:

VALID_STATUS_CODES = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100
 
class Tester(object):
    def __init__(self):
        self.redis = RedisClient()
 
    async def test_single_proxy(self, proxy):
        """
        測(cè)試單個(gè)代理
        :param proxy: 單個(gè)代理
        :return: None
        """
        conn = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(proxy, bytes):
                    proxy = proxy.decode('utf-8')
                real_proxy = 'http://' + proxy
                print('正在測(cè)試', proxy)
                async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
                    if response.status in VALID_STATUS_CODES:
                        self.redis.max(proxy)
                        print('代理可用', proxy)
                    else:
                        self.redis.decrease(proxy)
                        print('請(qǐng)求響應(yīng)碼不合法', proxy)
            except (ClientError, ClientConnectorError, TimeoutError, AttributeError):
                self.redis.decrease(proxy)
                print('代理請(qǐng)求失敗', proxy)
 
    def run(self):
        """
        測(cè)試主函數(shù)
        :return: None
        """
        print('測(cè)試器開始運(yùn)行')
        try:
            proxies = self.redis.all()
            loop = asyncio.get_event_loop()
            # 批量測(cè)試
            for i in range(0, len(proxies), BATCH_TEST_SIZE):
                test_proxies = proxies[i:i + BATCH_TEST_SIZE]
                tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                time.sleep(5)
        except Exception as e:
            print('測(cè)試器發(fā)生錯(cuò)誤', e.args)

在這里定義了一個(gè)類 Tester,init() 方法中建立了一個(gè) RedisClient 對(duì)象,供類中其他方法使用。接下來定義了一個(gè) test_single_proxy() 方法,用來檢測(cè)單個(gè)代理的可用情況,其參數(shù)就是被檢測(cè)的代理,注意這個(gè)方法前面加了 async 關(guān)鍵詞,代表這個(gè)方法是異步的,方法內(nèi)部首先創(chuàng)建了 Aiohttp 的 ClientSession 對(duì)象,此對(duì)象類似于 Requests 的 Session 對(duì)象,可以直接調(diào)用該對(duì)象的 get() 方法來訪問頁面,在這里代理的設(shè)置方式是通過 proxy 參數(shù)傳遞給 get() 方法,請(qǐng)求方法前面也需要加上 async 關(guān)鍵詞標(biāo)明是異步請(qǐng)求,這也是 Aiohttp 使用時(shí)的常見寫法。

測(cè)試的鏈接在這里定義常量為 TEST_URL,如果針對(duì)某個(gè)網(wǎng)站有抓取需求,建議將 TEST_URL 設(shè)置為目標(biāo)網(wǎng)站的地址,因?yàn)樵谧ト〉倪^程中,可能代理本身是可用的,但是該代理的 IP 已經(jīng)被目標(biāo)網(wǎng)站封掉了。例如,如要抓取知乎,可能其中某些代理是可以正常使用,比如訪問百度等頁面是完全沒有問題的,但是可能對(duì)知乎來說可能就被封了,所以可以將 TEST_URL 設(shè)置為知乎的某個(gè)頁面的鏈接,當(dāng)請(qǐng)求失敗時(shí),當(dāng)代理被封時(shí),分?jǐn)?shù)自然會(huì)減下來,就不會(huì)被取到了。

如果想做一個(gè)通用的代理池,則不需要專門設(shè)置 TEST_URL,可以設(shè)置為一個(gè)不會(huì)封 IP 的網(wǎng)站,也可以設(shè)置為百度這類響應(yīng)穩(wěn)定的網(wǎng)站。

另外我們還定義了 VALID_STATUS_CODES 變量,是一個(gè)列表形式,包含了正常的狀態(tài)碼,如可以定義成 [200],當(dāng)然對(duì)于某些檢測(cè)目標(biāo)網(wǎng)站可能會(huì)出現(xiàn)其他的狀態(tài)碼也是正常的,可以自行配置。

獲取 Response 后需要判斷響應(yīng)的狀態(tài),如果狀態(tài)碼在 VALID_STATUS_CODES 這個(gè)列表里,則代表代理可用,調(diào)用 RedisClient 的 max() 方法將代理分?jǐn)?shù)設(shè)為 100,否則調(diào)用 decrease() 方法將代理分?jǐn)?shù)減 1,如果出現(xiàn)異常也同樣將代理分?jǐn)?shù)減 1。

另外在測(cè)試的時(shí)候設(shè)置了批量測(cè)試的最大值 BATCH_TEST_SIZE 為 100,也就是一批測(cè)試最多測(cè)試 100個(gè),這可以避免當(dāng)代理池過大時(shí)全部測(cè)試導(dǎo)致內(nèi)存開銷過大的問題。

隨后在 run() 方法里面獲取了所有的代理列表,使用 Aiohttp 分配任務(wù),啟動(dòng)運(yùn)行,這樣就可以進(jìn)行異步檢測(cè)了,寫法可以參考 Aiohttp 的官方示例:http://aiohttp.readthedocs.io/。

這樣測(cè)試模塊的邏輯就完成了。

接口模塊

通過上述三個(gè)模塊我們已經(jīng)可以做到代理的獲取、檢測(cè)和更新了,數(shù)據(jù)庫中就會(huì)以有序集合的形式存儲(chǔ)各個(gè)代理還有對(duì)應(yīng)的分?jǐn)?shù),分?jǐn)?shù) 100 代表可用,分?jǐn)?shù)越小代表越不可用。

但是我們?cè)鯓觼矸奖愕孬@取可用代理呢?用 RedisClient 類來直接連接 Redis 然后調(diào)用 random() 方法獲取當(dāng)然沒問題,這樣做效率很高,但是有這么幾個(gè)弊端:

需要知道 Redis 的用戶名和密碼,如果這個(gè)代理池是給其他人使用的就需要告訴他連接的用戶名和密碼信息,這樣是很不安全的。

代理池如果想持續(xù)運(yùn)行需要部署在遠(yuǎn)程服務(wù)器上運(yùn)行,如果遠(yuǎn)程服務(wù)器的 Redis 是只允許本地連接的,那么就沒有辦法遠(yuǎn)程直連 Redis 獲取代理了。

如果爬蟲所在的主機(jī)沒有連接 Redis 的模塊,或者爬蟲不是由 Python 語言編寫的,那么就無法使用 RedisClient 來獲取代理了。

如果 RedisClient 類或者數(shù)據(jù)庫結(jié)構(gòu)有更新,那么在爬蟲端還需要去同步這些更新。

綜上考慮,為了使得代理池可以作為一個(gè)獨(dú)立服務(wù)運(yùn)行,我們最好增加一個(gè)接口模塊,以 Web API 的形式暴露可用代理。

這樣獲取代理只需要請(qǐng)求一下接口即可,以上的幾個(gè)缺點(diǎn)弊端可以解決。

我們?cè)谶@里使用一個(gè)比較輕量級(jí)的庫 Flask 來實(shí)現(xiàn)這個(gè)接口模塊,實(shí)現(xiàn)示例如下:

from flask import Flask, g
from db import RedisClient
 
__all__ = ['app']
app = Flask(__name__)
 
def get_conn():
    if not hasattr(g, 'redis'):
        g.redis = RedisClient()
    return g.redis
 
@app.route('/')
def index():
    return '<h3>Welcome to Proxy Pool System</h3>'
 
@app.route('/random')
def get_proxy():
    """
    獲取隨機(jī)可用代理
    :return: 隨機(jī)代理
    """
    conn = get_conn()
    return conn.random()
 
@app.route('/count')
def get_counts():
    """
    獲取代理池總量
    :return: 代理池總量
    """
    conn = get_conn()
    return str(conn.count())
 
if __name__ == '__main__':
    app.run()

在這里我們聲明了一個(gè) Flask 對(duì)象,定義了三個(gè)接口,分別是首頁、隨機(jī)代理頁、獲取數(shù)量頁。

運(yùn)行之后 Flask 會(huì)啟動(dòng)一個(gè) Web 服務(wù),我們只需要訪問對(duì)應(yīng)的接口即可獲取到可用代理。

調(diào)度模塊

這個(gè)模塊其實(shí)就是調(diào)用以上所定義的三個(gè)模塊,將以上三個(gè)模塊通過多進(jìn)程的形式運(yùn)行起來,示例如下:

TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True
 
from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester
 
class Scheduler():
    def schedule_tester(self, cycle=TESTER_CYCLE):
        """
        定時(shí)測(cè)試代理
        """
        tester = Tester()
        while True:
            print('測(cè)試器開始運(yùn)行')
            tester.run()
            time.sleep(cycle)
 
    def schedule_getter(self, cycle=GETTER_CYCLE):
        """
        定時(shí)獲取代理
        """
        getter = Getter()
        while True:
            print('開始抓取代理')
            getter.run()
            time.sleep(cycle)
 
    def schedule_api(self):
        """
        開啟API
        """
        app.run(API_HOST, API_PORT)
 
    def run(self):
        print('代理池開始運(yùn)行')
        if TESTER_ENABLED:
            tester_process = Process(target=self.schedule_tester)
            tester_process.start()
 
        if GETTER_ENABLED:
            getter_process = Process(target=self.schedule_getter)
            getter_process.start()
 
        if API_ENABLED:
            api_process = Process(target=self.schedule_api)
            api_process.start()

在這里還有三個(gè)常量,TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布爾類型,True 或者 False。標(biāo)明了測(cè)試模塊、獲取模塊、接口模塊的開關(guān),如果為 True,則代表模塊開啟。

啟動(dòng)入口是 run() 方法,其分別判斷了三個(gè)模塊的開關(guān),如果開啟的話,就新建一個(gè) Process 進(jìn)程,設(shè)置好啟動(dòng)目標(biāo),然后調(diào)用 start() 方法運(yùn)行,這樣三個(gè)進(jìn)程就可以并行執(zhí)行,互不干擾。

三個(gè)調(diào)度方法結(jié)構(gòu)也非常清晰,比如 schedule_tester() 方法,這是用來調(diào)度測(cè)試模塊的方法,首先聲明一個(gè) Tester 對(duì)象,然后進(jìn)入死循環(huán)不斷循環(huán)調(diào)用其 run() 方法,執(zhí)行完一輪之后就休眠一段時(shí)間,休眠結(jié)束之后重新再執(zhí)行。在這里休眠時(shí)間也定義為一個(gè)常量,如 20 秒,這樣就會(huì)每隔 20 秒進(jìn)行一次代理檢測(cè)。

最后整個(gè)代理池的運(yùn)行只需要調(diào)用 Scheduler 的 run() 方法即可啟動(dòng)。

以上便是整個(gè)代理池的架構(gòu)和相應(yīng)實(shí)現(xiàn)邏輯。

5. 運(yùn)行

接下來我們將代碼整合一下,將代理運(yùn)行起來,運(yùn)行之后的輸出結(jié)果如圖 9-2 所示:

Python3爬蟲如何維護(hù)代理池

運(yùn)行結(jié)果

以上是代理池的控制臺(tái)輸出,可以看到可用代理設(shè)置為 100,不可用代理分?jǐn)?shù)減 1。

接下來我們?cè)俅蜷_瀏覽器,當(dāng)前配置了運(yùn)行在 5555 端口,所以打開:http://127.0.0.1:5555,即可看到其首頁,如圖 9-3 所示:

Python3爬蟲如何維護(hù)代理池

圖 9-3 首頁頁面

再訪問:http://127.0.0.1:5555/random,即可獲取隨機(jī)可用代理,如圖 9-4 所示:

Python3爬蟲如何維護(hù)代理池

圖 9-4 獲取代理頁面

所以后面我們只需要訪問此接口即可獲取一個(gè)隨機(jī)可用代理,非常方便。

獲取代理的代碼如下:

import requests
 
PROXY_POOL_URL = 'http://localhost:5555/random'
 
def get_proxy():
    try:
        response = requests.get(PROXY_POOL_URL)
        if response.status_code == 200:
            return response.text
    except ConnectionError:
        return None

獲取下來之后便是一個(gè)字符串類型的代理,可以按照上一節(jié)所示的方法設(shè)置代理,如 Requests 的使用方法如下:

import requests
 
proxy = get_proxy()
proxies = {
    'http': 'http://' + proxy,
    'https': 'https://' + proxy,
}
try:
    response = requests.get('http://httpbin.org/get', proxies=proxies)
    print(response.text)
except requests.exceptions.ConnectionError as e:
    print('Error', e.args)

有了代理池之后,我們?cè)偃〕龃砑纯捎行Х乐笽P被封禁的情況。

以上就是關(guān)于Python3爬蟲如何維護(hù)代理池的內(nèi)容,如果你們有學(xué)習(xí)到知識(shí)或者技能,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI