溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何優(yōu)雅的落地一個分布式爬蟲:實戰(zhàn)篇

發(fā)布時間:2020-08-01 19:03:02 來源:網(wǎng)絡(luò) 閱讀:1346 作者:Python小咖 欄目:編程語言

本篇文章將從實戰(zhàn)角度來介紹如何構(gòu)建一個穩(wěn)健的分布式微博爬蟲。這里我沒敢談高效,抓過微博數(shù)據(jù)的同學應該都知道微博的反爬蟲能力,也知道微博數(shù)據(jù)抓取的瓶頸在哪里。我在知乎上看過一些同學的說法,把微博的數(shù)據(jù)抓取難度簡單化了,我只能說,那是你太naive,沒深入了解和長期抓取而已。

本文將會以PC端微博進行講解,因為移動端微博數(shù)據(jù)不如PC短全面,而且抓取和解析難度都會小一些。文章比較長,由于篇幅所限,文章并沒有列出所有代碼,只是講了大致流程和思路。

要抓微博數(shù)據(jù),第一步便是模擬登陸,因為很多信息(比如用戶信息,用戶主頁微博數(shù)據(jù)翻頁等各種翻頁)都需要在登錄狀態(tài)下才能查看

這里我簡單說一下,做爬蟲的同學不要老想著用什么機器學習的方法去識別復雜驗證碼,真的難度非常大,這應該也不是一個爬蟲工程師的工作重點,當然這只是我的個人建議。工程化的項目,我還是建議大家通過打碼平臺來解決驗證碼的問題。

說完模擬登陸(具體請參見我寫的那兩篇文章,篇幅所限,我就不copy過來了),我們現(xiàn)在正式進入微博的數(shù)據(jù)抓取。這里我會以微博用戶信息抓取為例來進行分析和講解。

關(guān)于用戶信息抓取,可能我們有兩個目的。一個是我們只想抓一些指定用戶,另外一個是我們想盡可能多的抓取更多數(shù)量的用戶的信息。我的目的假定是第二種。那么我們該以什么樣的策略來抓取,才能獲得盡可能多的用戶信息呢?如果我們初始用戶選擇有誤,選了一些不活躍的用戶,很可能會形成一個環(huán),這樣就抓不了太多的數(shù)據(jù)。這里有一個很簡單的思路:我們把一些大V拿來做為種子用戶,我們先抓他們的個人信息,然后再抓大V所關(guān)注的用戶和粉絲,大V關(guān)注的用戶肯定也是類似大V的用戶,這樣的話,就不容易形成環(huán)了。

策略我們都清楚了。就該是分析和編碼了。

我們先來分析如何構(gòu)造用戶信息的URL。這里我以微博名為一起神吐槽的博主為例進行分析。做爬蟲的話,一個很重要的意識就是爬蟲能抓的數(shù)據(jù)都是人能看到的數(shù)據(jù),反過來,人能在瀏覽器上看到的數(shù)據(jù),爬蟲幾乎都能抓。這里用的是幾乎,因為有的數(shù)據(jù)抓取難度特別。我們首先需要以正常人的流程看看怎么獲取到用戶的信息。我們先進入該博主的主頁,如下圖
如何優(yōu)雅的落地一個分布式爬蟲:實戰(zhàn)篇

種子用戶主頁

點擊查看更多,可以查看到該博主的具體信息

如何優(yōu)雅的落地一個分布式爬蟲:實戰(zhàn)篇

種子博主具體信息

這里我們就看到了他的具體信息了。然后,我們看該頁面的url構(gòu)造

weibo.com/p/100505175…
我直接copy的地址欄的url。這樣做有啥不好的呢?對于老鳥來說,一下就看出來了,這樣做的話,可能會導致信息不全,因為可能有些信息是動態(tài)加載的。所以,我們需要通過抓包來判斷到底微博會通過該url返回所有信息,還是需要請求一些ajax 鏈接才會返回一些關(guān)鍵信息。這里我就重復一下我的觀點:抓包很重要,抓包很重要,抓包很重要!重要的事情說三遍。

我們抓完包,發(fā)現(xiàn)并沒有ajax請求。那么可以肯定請求前面的url,會返回所有信息。我們通過點擊鼠標右鍵,查看網(wǎng)頁源代碼,然后ctrl+a、ctrl+c將所有的頁面源碼保存到本地,這里我命名為personinfo.html。我們用瀏覽器打開該文件,發(fā)現(xiàn)我們需要的所有信息都在這段源碼中,這個工作和抓包判斷數(shù)據(jù)是否全面有些重復,但是在我看來是必不可少的,因為我們解析頁面數(shù)據(jù)的時候還可以用到這個html文件,如果我們每次都通過網(wǎng)絡(luò)請求去解析內(nèi)容的話,那么可能賬號沒一會兒就會被封了(因為頻繁訪問微博信息),所以我們需要把要解析的文件保存到本地。

從上面分析中我們可以得知

weibo.com/p/100505175…
這個url就是獲取用戶數(shù)據(jù)的url。那么我們在只知道用戶id的時候怎么構(gòu)造它呢?我們可以多拿幾個用戶id來做測試,看構(gòu)造是否有規(guī)律,比如我這里以用戶名為網(wǎng)易云音樂的用戶做分析,發(fā)現(xiàn)它的用戶信息頁面構(gòu)造如下

weibo.com/1721030997/…
這個就和上面那個不同了。但是我們仔細觀察,可以發(fā)現(xiàn)上面那個是個人用戶,下面是企業(yè)微博用戶。我們嘗試一下把它們url格式都統(tǒng)一為第一種或者第二種的格式

weibo.com/1751195602/…
這樣會出現(xiàn)404,那么統(tǒng)一成上面那種呢?

weibo.com/p/100505172…
這樣子的話,它會被重定向到用戶主頁,而不是用戶詳細資料頁。所以也就不對了。那么該以什么依據(jù)判斷何時用第一種url格式,何時用第二種url格式呢?我們多翻幾個用戶,會發(fā)現(xiàn)除了100505之外,還有100305、100206等前綴,那么我猜想這個應該可以區(qū)分不同用戶。這個前綴在哪里可以得到呢?我們打開我們剛保存的頁面源碼,搜索100505,可以發(fā)現(xiàn)
如何優(yōu)雅的落地一個分布式爬蟲:實戰(zhàn)篇

domain

微博應該是根據(jù)這個來區(qū)分不同用戶類型的。這里大家可以自己也可以試試,看不同用戶的domain是否不同。為了數(shù)據(jù)能全面,我也是做了大量測試,發(fā)現(xiàn)個人用戶的domain是1005051,作家是100305,其他基本都是認證的企業(yè)號。前兩個個人信息的url構(gòu)造就是

weibo.com/p/domain+ui…
后者的是

weibo.com/uid/about
弄清楚了個人信息url的構(gòu)造方式,但是還有一個問題。我們已知只有uid啊,沒有domain啊。如果是企業(yè)號,我們通過domain=100505會被重定向到主頁,如果是作家等(domain=100305或者100306),也會被重定向主頁。我們在主頁把domain提取出來,再請求一次,不就能拿到用戶詳細信息了嗎?

關(guān)于如何構(gòu)造獲取用戶信息的url的相關(guān)分析就到這里了。因為我們是在登錄的情況下進行數(shù)據(jù)抓取的,可能在抓取的時候,某個賬號突然就被封了,或者由于網(wǎng)絡(luò)原因,某次請求失敗了,該如何處理?對于前者,我們需要判斷每次請求返回的內(nèi)容是否符合預期,也就是看response url是否正常,看response content是否是404或者讓你驗證手機號等,對于后者,我們可以做一個簡單的重試策略,大概代碼如下

@timeout_decorator
def get_page(url, user_verify=True, need_login=True):
"""
:param url: 待抓取url
:param user_verify: 是否為可能出現(xiàn)驗證碼的頁面(ajax連接不會出現(xiàn)驗證碼,如果是請求微博或者用戶信息可能出現(xiàn)驗證碼),否為抓取轉(zhuǎn)發(fā)的ajax連接
:param need_login: 抓取頁面是否需要登錄,這樣做可以減小一些賬號的壓力
:return: 返回請求的數(shù)據(jù),如果出現(xiàn)404或者403,或者是別的異常,都返回空字符串
"""
crawler.info('本次抓取的url為{url}'.format(url=url))
count = 0

while count < max_retries:

    if need_login:
        # 每次重試的時候都換cookies,并且和上次不同,如果只有一個賬號,那么就允許相同
        name_cookies = Cookies.fetch_cookies()

        if name_cookies is None:
            crawler.warning('cookie池中不存在cookie,正在檢查是否有可用賬號')
            rs = get_login_info()

            # 選擇狀態(tài)正常的賬號進行登錄,賬號都不可用就停掉celery worker
            if len(rs) == 0:
                crawler.error('賬號均不可用,請檢查賬號健康狀況')
                # 殺死所有關(guān)于celery的進程
                if 'win32' in sys.platform:
                    os.popen('taskkill /F /IM "celery*"')
                else:
                    os.popen('pkill -f "celery"')
            else:
                crawler.info('重新獲取cookie中...')
                login.excute_login_task()
                time.sleep(10)

    try:
        if need_login:
            resp = requests.get(url, headers=headers, cookies=name_cookies[1], timeout=time_out, verify=False)

            if "$CONFIG['islogin'] = '0'" in resp.text:
                crawler.warning('賬號{}出現(xiàn)異常'.format(name_cookies[0]))
                freeze_account(name_cookies[0], 0)
                Cookies.delete_cookies(name_cookies[0])
                continue
        else:
            resp = requests.get(url, headers=headers, timeout=time_out, verify=False)

        page = resp.text
        if page:
            page = page.encode('utf-8', 'ignore').decode('utf-8')
        else:
            continue

        # 每次抓取過后程序sleep的時間,降低封號危險
        time.sleep(interal)

        if user_verify:
            if 'unfreeze' in resp.url or 'accessdeny' in resp.url or 'userblock' in resp.url or is_403(page):
                crawler.warning('賬號{}已經(jīng)被凍結(jié)'.format(name_cookies[0]))
                freeze_account(name_cookies[0], 0)
                Cookies.delete_cookies(name_cookies[0])
                count += 1
                continue

            if 'verifybmobile' in resp.url:
                crawler.warning('賬號{}功能被鎖定,需要手機解鎖'.format(name_cookies[0]))

                freeze_account(name_cookies[0], -1)
                Cookies.delete_cookies(name_cookies[0])
                continue

            if not is_complete(page):
                count += 1
                continue

            if is_404(page):
                crawler.warning('url為{url}的連接不存在'.format(url=url))
                return ''

    except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError, AttributeError) as e:
        crawler.warning('抓取{}出現(xiàn)異常,具體信息是{}'.format(url, e))
        count += 1
        time.sleep(excp_interal)

    else:
        Urls.store_crawl_url(url, 1)
        return page

crawler.warning('抓取{}已達到最大重試次數(shù),請在redis的失敗隊列中查看該url并檢查原因'.format(url))
Urls.store_crawl_url(url, 0)
return ''

這里大家把上述代碼當一段偽代碼讀就行了,主要看看如何處理抓取時候的異常。因為如果貼整個用戶抓取的代碼,不是很現(xiàn)實,代碼量有點大。

下面講頁面解析的分析。有一些做PC端微博信息抓取的同學,可能曾經(jīng)遇到過這么個問題:保存到本地的html文件打開都能看到所有信息啊,為啥在頁面源碼中找不到呢?因為PC端微博頁面的關(guān)鍵信息都是像下圖這樣,被FM.view()包裹起來的,里面的數(shù)據(jù)可能被json encode過。

如何優(yōu)雅的落地一個分布式爬蟲:實戰(zhàn)篇

標簽

那么這么多的FM.view(),我們怎么知道該提取哪個呢?這里有一個小技巧,由于只有中文會被編碼,英文還是原來的樣子,所以我們可以看哪段script中包含了渲染后的頁面中的字符,那么那段應該就可能包含所有頁面信息。我們這里以頂部的頭像為例,如圖

如何優(yōu)雅的落地一個分布式爬蟲:實戰(zhàn)篇
根據(jù)唯一性判斷

我們在頁面源碼中搜索,只發(fā)現(xiàn)一個script中有該字符串,那么就是那段script是頁面相關(guān)信息。我們可以通過正則表達式把該script提取出來,然后把其中的html也提取出來,再保存到本地,看看信息是否全面。這里我就不截圖了。感覺還有很多要寫的,不然篇幅太長了。

另外,對于具體頁面的解析,我也不做太多的介紹了。太細的東西還是建議讀讀源碼。我只講一下,我覺得的一種處理異常的比較優(yōu)雅的方式。微博爬蟲的話,主要是頁面樣式太多,如果你打算包含所有不同的用戶的模版,那么我覺得幾乎不可能,不同用戶模版,用到的解析規(guī)則就不一樣。那么出現(xiàn)解析異常如何處理?尤其是你沒有catch到的異常。很可能因為這個問題,程序就崩掉。其實對于Python這門語言來說,我們可以通過 裝飾器 來捕捉我們沒有考慮到的異常,比如我這個裝飾器

def parse_decorator(return_type):
"""
:param return_type: 用于捕捉頁面解析的異常, 0表示返回數(shù)字0, 1表示返回空字符串, 2表示返回[],3表示返回False, 4表示返回{}, 5返回None
:return: 0,'',[],False,{},None
"""
def page_parse(func):@wraps(func)
br/>@wraps(func)
keys):
try:
return func(keys)
except Exception as e:
parser.error(e)

            if return_type == 5:
                return None
            elif return_type == 4:
                return {}
            elif return_type == 3:
                return False
            elif return_type == 2:
                return []
            elif return_type == 1:
                return ''
            else:
                return 0

    return handle_error

return page_parse

上面的代碼就是處理解析頁面發(fā)生異常的情況,我們只能在數(shù)據(jù)的準確性、全面性和程序的健壯性之間做一些取舍。用裝飾器的話,程序中不用寫太多的 try語句,代碼重復率也會減少很多。

頁面的解析由于篇幅所限,我就講到這里了。沒有涉及太具體的解析,其中一個還有一個比較難的點,就是數(shù)據(jù)的全面性,讀者可以去多觀察幾個微博用戶的個人信息,就會發(fā)現(xiàn)有的個人信息,有的用戶有填寫,有的并沒有。解析的時候要考慮完的話,建議從自己的微博的個人信息入手,看到底有哪些可以填。這樣可以保證幾乎不會漏掉一些重要的信息。

最后,我再切合本文的標題,講如何搭建一個分布式的微博爬蟲。開發(fā)過程中,我們可以先就做單機單線程的爬蟲,然后再改成使用celery的方式。這里這樣做是為了方便開發(fā)和測試,因為你單機搭起來并且跑得通了,那么分布式的話,就很容易改了,因為celery的API使用本來就很簡潔。

我們抓取的是用戶信息和他的關(guān)注和粉絲uid。用戶信息的話,我們一個請求大概能抓取一個用戶的信息,而粉絲和關(guān)注我們一個請求可以抓取18個左右(因為這個抓的是列表),顯然可以發(fā)現(xiàn)用戶信息應該多占一些請求的資源。這時候就該介紹理論篇沒有介紹的關(guān)于celery的一個高級特性了,它叫做任務路由。直白點說,它可以規(guī)定哪個分布式節(jié)點能做哪些任務,不能做哪些任務。它的存在可以讓資源分配更加合理, 分布式微博爬蟲項目初期,就沒有使用任務路由,然后抓了十多萬條關(guān)注和分析,結(jié)果發(fā)現(xiàn)用戶信息抓幾萬條,這就是資源分配得不合理。那么如何進行任務路由呢?

coding:utf-8

import os
from datetime import timedelta
from celery import Celery
from kombu import Exchange, Queue
from config.conf import get_broker_or_backend
from celery import platforms

允許celery以root身份啟動

platforms.C_FORCE_ROOT = True

worker_log_path = os.path.join(os.path.dirname(os.path.dirname(file))+'/logs', 'celery.log')
beat_log_path = os.path.join(os.path.dirname(os.path.dirname(file))+'/logs', 'beat.log')

tasks = ['tasks.login', 'tasks.user']

include的作用就是注冊服務化函數(shù)

app = Celery('weibo_task', include=tasks, broker=get_broker_or_backend(1), backend=get_broker_or_backend(2))

app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERYD_LOG_FILE=worker_log_path,
CELERYBEAT_LOG_FILE=beat_log_path,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(
Queue('login_queue', exchange=Exchange('login', type='direct'), routing_key='for_login'),
Queue('user_crawler', exchange=Exchange('user_info', type='direct'), routing_key='for_user_info'),
Queue('fans_followers', exchange=Exchange('fans_followers', type='direct'), routing_key='for_fans_followers'),
)
上述代碼我指定了有l(wèi)ogin_queue、user_crawler、fans_followers三個任務隊列。它們分別的作用是登錄、用戶信息抓取、粉絲和關(guān)注抓取。現(xiàn)在假設(shè)我有三臺爬蟲服務器A、B和C。我想讓我所有的賬號登錄任務分散到三臺服務器、讓用戶抓取在A和B上執(zhí)行,讓粉絲和關(guān)注抓取在C上執(zhí)行,那么啟動A、B、C三個服務器的celery worker的命令就分別是

celery -A tasks.workers -Q login_queue,user_crawler worker -l info -c 1 # A服務器和B服務器啟動worker的命令,它們只會執(zhí)行登錄和用戶信息抓取任務
celery -A tasks.workers -Q login_queue,fans_followers worker -l info -c 1 # C服務器啟動worker的命令,它只會執(zhí)行登錄、粉絲和關(guān)注抓取任務
然后我們通過命令行或者代碼(如下)就能發(fā)送所有任務給各個節(jié)點執(zhí)行了

coding:utf-8

from tasks.workers import app
from page_get import user as user_get
from db.seed_ids import get_seed_ids, get_seed_by_id, insert_seeds, set_seed_other_crawled

@app.task(ignore_result=True)
def crawl_follower_fans(uid):
seed = get_seed_by_id(uid)
if seed.other_crawled == 0:
rs = user_get.get_fans_or_followers_ids(uid, 1)
rs.extend(user_get.get_fans_or_followers_ids(uid, 2))
datas = set(rs)

重復數(shù)據(jù)跳過插入

    if datas:
        insert_seeds(datas)
    set_seed_other_crawled(uid)

@app.task(ignore_result=True)
def crawl_person_infos(uid):
"""
根據(jù)用戶id來爬取用戶相關(guān)資料和用戶的關(guān)注數(shù)和粉絲數(shù)(由于微博服務端限制,默認爬取前五頁,企業(yè)號的關(guān)注和粉絲也不能查看)
:param uid: 用戶id
:return:
"""
if not uid:
return

# 由于與別的任務共享數(shù)據(jù)表,所以需要先判斷數(shù)據(jù)庫是否有該用戶信息,再進行抓取
user = user_get.get_profile(uid)
# 不抓取企業(yè)號
if user.verify_type == 2:
    set_seed_other_crawled(uid)
    return
app.send_task('tasks.user.crawl_follower_fans', args=(uid,), queue='fans_followers',
              routing_key='for_fans_followers')

@app.task(ignore_result=True)
def excute_user_task():
seeds = get_seed_ids()
if seeds:
for seed in seeds:

在send_task的時候指定任務隊列

        app.send_task('tasks.user.crawl_person_infos', args=(seed.uid,), queue='user_crawler',
                      routing_key='for_user_info')

這里我們是通過 queue='user_crawler',routing_key='for_user_info'來將任務和worker進行關(guān)聯(lián)的。

關(guān)于celery任務路由的更詳細的資料請閱讀官方文檔。

到這里,基本把微博信息抓取的過程和分布式進行抓取的過程都講完了,具體實現(xiàn)分布式的方法,可以讀讀基礎(chǔ)篇。由于代碼量比較大,我并沒有貼上完整的代碼,只講了要點。分析過程是講的抓取過程的分析和頁面解析的分析,并在最后,結(jié)合分布式,講了一下使用任務隊列來讓分布式爬蟲更加靈活和可擴展。

如果有同學想跟著做一遍,可能需要參考分布式微博爬蟲的源碼,自己動手實現(xiàn)一下,或者跑一下,印象可能會更加深刻。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI